aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar janakr <janakr@google.com>2018-08-13 16:13:42 -0700
committerGravatar Copybara-Service <copybara-piper@google.com>2018-08-13 16:15:46 -0700
commit6ae05b419922193c4c253e51c9a5e483e4f947fa (patch)
treee06491f807689124cad73a2011af2db821e390e1
parentce170540ba5401e926a5433e6e35b1d22426e525 (diff)
Order Skyframe evaluations in a priority queue, with all children of a given node having the same priority, later enqueueings having higher priority, re-enqueued nodes having highest priority, and new root nodes having lowest priority. Experimentally, this can save significant RAM (1.4G in some builds!) while not affecting speed.
Also do a semi-drive-by deleting ExecutorFactory parameter to AbstractQueueVisitor, since it was always AbstractQueueVisitor.EXECUTOR_FACTORY. PiperOrigin-RevId: 208560889
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java111
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/ExecutorParams.java61
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java2
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java1
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java4
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java92
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java44
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java17
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java31
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java19
-rw-r--r--src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java1
-rw-r--r--src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java1
-rw-r--r--src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java3
-rw-r--r--src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java11
14 files changed, 203 insertions, 195 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index c4e526ba97..1eb64979d2 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -14,15 +14,16 @@
package com.google.devtools.build.lib.concurrent;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.concurrent.ErrorClassifier.ErrorClassification;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -32,27 +33,6 @@ import java.util.logging.Logger;
/** A {@link QuiescingExecutor} implementation that wraps an {@link ExecutorService}. */
public class AbstractQueueVisitor implements QuiescingExecutor {
-
- /**
- * Default factory function for constructing {@link ThreadPoolExecutor}s. The {@link
- * ThreadPoolExecutor}s this creates have the same value for {@code corePoolSize} and {@code
- * maximumPoolSize} because that results in a fixed-size thread pool, and the current use cases
- * for {@link AbstractQueueVisitor} don't require any more sophisticated thread pool size
- * management.
- *
- * <p>If client use cases change, they may invoke one of the {@link
- * AbstractQueueVisitor#AbstractQueueVisitor} constructors that accepts a pre-constructed {@link
- * ThreadPoolExecutor}.
- */
- public static final Function<ExecutorParams, ThreadPoolExecutor> EXECUTOR_FACTORY =
- p ->
- new ThreadPoolExecutor(
- /*corePoolSize=*/ p.getParallelism(),
- /*maximumPoolSize=*/ p.getParallelism(),
- p.getKeepAliveTime(),
- p.getUnits(),
- p.getWorkQueue(),
- new ThreadFactoryBuilder().setNameFormat(p.getPoolName() + " %d").build());
/**
* The most severe unhandled exception thrown by a worker thread, according to {@link
* #errorClassifier}. This exception gets propagated to the calling thread of {@link
@@ -104,6 +84,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
private final ExecutorService executorService;
+ private final boolean usingPriorityQueue;
+
/**
* Flag used to record when the main thread (the thread which called {@link #awaitQuiescence}) is
* interrupted.
@@ -131,21 +113,55 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
private static final Logger logger = Logger.getLogger(AbstractQueueVisitor.class.getName());
+ /**
+ * Default function for constructing {@link ThreadPoolExecutor}s. The {@link ThreadPoolExecutor}s
+ * this creates have the same value for {@code corePoolSize} and {@code maximumPoolSize} because
+ * that results in a fixed-size thread pool, and the current use cases for {@link
+ * AbstractQueueVisitor} don't require any more sophisticated thread pool size management.
+ *
+ * <p>If client use cases change, they may invoke one of the {@link
+ * AbstractQueueVisitor#AbstractQueueVisitor} constructors that accepts a pre-constructed {@link
+ * ThreadPoolExecutor}.
+ */
private static ExecutorService createExecutorService(
int parallelism,
long keepAliveTime,
TimeUnit units,
+ BlockingQueue<Runnable> workQueue,
+ String poolName) {
+ return new ThreadPoolExecutor(
+ /*corePoolSize=*/ parallelism,
+ /*maximumPoolSize=*/ parallelism,
+ keepAliveTime,
+ units,
+ workQueue,
+ new ThreadFactoryBuilder()
+ .setNameFormat(Preconditions.checkNotNull(poolName) + " %d")
+ .build());
+ }
+
+ /**
+ * Creates an {@link AbstractQueueVisitor}, similar to {@link #AbstractQueueVisitor(int, long,
+ * TimeUnit, boolean, String, ErrorClassifier)}, but whose work is ordered by a {@link
+ * PriorityBlockingQueue}. The {@link Runnable} objects submitted to {@link #execute(Runnable)}
+ * must implement {@link Comparable}.
+ */
+ public static AbstractQueueVisitor createWithPriorityQueue(
+ int parallelism,
+ long keepAliveTime,
+ TimeUnit units,
+ boolean failFastOnException,
String poolName,
- Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
- return Preconditions.checkNotNull(executorFactory)
- .apply(
- new ExecutorParams(
- parallelism,
- keepAliveTime,
- units,
- Preconditions.checkNotNull(poolName),
- new BlockingStack<Runnable>()));
+ ErrorClassifier errorClassifier) {
+ return new AbstractQueueVisitor(
+ createExecutorService(
+ parallelism, keepAliveTime, units, new PriorityBlockingQueue<>(), poolName),
+ true,
+ failFastOnException,
+ errorClassifier,
+ /*usingPriorityQueue=*/ true);
}
+
/**
* Create the {@link AbstractQueueVisitor}.
*
@@ -157,7 +173,6 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
* @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code
* null}, default thread naming will be used.
- * @param executorFactory the factory for constructing the executor service.
* @param errorClassifier an error classifier used to determine whether to log and/or stop jobs.
*/
public AbstractQueueVisitor(
@@ -166,10 +181,9 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
TimeUnit units,
boolean failFastOnException,
String poolName,
- Function<ExecutorParams, ? extends ExecutorService> executorFactory,
ErrorClassifier errorClassifier) {
this(
- createExecutorService(parallelism, keepAliveTime, units, poolName, executorFactory),
+ createExecutorService(parallelism, keepAliveTime, units, new BlockingStack<>(), poolName),
true,
failFastOnException,
errorClassifier);
@@ -190,10 +204,25 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
boolean shutdownOnCompletion,
boolean failFastOnException,
ErrorClassifier errorClassifier) {
+ this(
+ executorService,
+ shutdownOnCompletion,
+ failFastOnException,
+ errorClassifier,
+ /*usingPriorityQueue=*/ false);
+ }
+
+ private AbstractQueueVisitor(
+ ExecutorService executorService,
+ boolean shutdownOnCompletion,
+ boolean failFastOnException,
+ ErrorClassifier errorClassifier,
+ boolean usingPriorityQueue) {
this.failFastOnException = failFastOnException;
this.ownExecutorService = shutdownOnCompletion;
this.executorService = Preconditions.checkNotNull(executorService);
this.errorClassifier = Preconditions.checkNotNull(errorClassifier);
+ this.usingPriorityQueue = usingPriorityQueue;
}
@Override
@@ -218,6 +247,9 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
/** Schedules a call. Called in a worker thread. */
@Override
public final void execute(Runnable runnable) {
+ if (usingPriorityQueue) {
+ Preconditions.checkState(runnable instanceof Comparable);
+ }
WrappedRunnable wrappedRunnable = new WrappedRunnable(runnable);
try {
// It's impossible for this increment to result in remainingTasks.get <= 0 because
@@ -239,7 +271,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
}
- protected void executeRunnable(Runnable runnable) {
+ protected void executeRunnable(WrappedRunnable runnable) {
executorService.execute(runnable);
}
@@ -309,7 +341,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* <li>And, lastly, calls {@link #decrementRemainingTasks}.
* </ul>
*/
- private final class WrappedRunnable implements Runnable {
+ protected final class WrappedRunnable implements Runnable, Comparable<WrappedRunnable> {
private final Runnable originalRunnable;
private volatile boolean ran;
@@ -345,6 +377,13 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
}
}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compareTo(WrappedRunnable o) {
+ // This should only be called when the concrete class is submitting comparable runnables.
+ return ((Comparable) originalRunnable).compareTo(o.originalRunnable);
+ }
}
private void addJob(Thread thread) {
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ExecutorParams.java b/src/main/java/com/google/devtools/build/lib/concurrent/ExecutorParams.java
deleted file mode 100644
index c06144c8df..0000000000
--- a/src/main/java/com/google/devtools/build/lib/concurrent/ExecutorParams.java
+++ /dev/null
@@ -1,61 +0,0 @@
-// Copyright 2014 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.google.devtools.build.lib.concurrent;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/** Configuration parameters for {@link ExecutorService} construction. */
-public class ExecutorParams {
- private final int parallelism;
- private final long keepAliveTime;
- private final TimeUnit units;
- private final String poolName;
- private final BlockingQueue<Runnable> workQueue;
-
- public ExecutorParams(
- int parallelism,
- long keepAliveTime,
- TimeUnit units,
- String poolName,
- BlockingQueue<Runnable> workQueue) {
- this.parallelism = parallelism;
- this.keepAliveTime = keepAliveTime;
- this.units = units;
- this.poolName = poolName;
- this.workQueue = workQueue;
- }
-
- public int getParallelism() {
- return parallelism;
- }
-
- public long getKeepAliveTime() {
- return keepAliveTime;
- }
-
- public TimeUnit getUnits() {
- return units;
- }
-
- public String getPoolName() {
- return poolName;
- }
-
- public BlockingQueue<Runnable> getWorkQueue() {
- return workQueue;
- }
-}
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
index 005d4f1ac9..89a7454f0f 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
@@ -92,7 +92,7 @@ public class ForkJoinQuiescingExecutor extends AbstractQueueVisitor {
}
@Override
- protected void executeRunnable(Runnable runnable) {
+ protected void executeRunnable(WrappedRunnable runnable) {
if (ForkJoinTask.inForkJoinPool()) {
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError = ForkJoinTask.adapt(runnable).fork();
diff --git a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
index af06cde70e..b65538ccb9 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
@@ -249,7 +249,6 @@ final class LabelVisitor {
TimeUnit.SECONDS,
!keepGoing,
THREAD_NAME,
- AbstractQueueVisitor.EXECUTOR_FACTORY,
ErrorClassifier.DEFAULT);
this.eventHandler = eventHandler;
this.maxDepth = maxDepth;
diff --git a/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java
index f8f21f254e..268db07014 100644
--- a/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java
@@ -249,7 +249,9 @@ public abstract class AbstractExceptionalParallelEvaluator<E extends Exception>
// This must be equivalent to the code in enqueueChild above, in order to be thread-safe.
switch (entry.addReverseDepAndCheckIfDone(null)) {
case NEEDS_SCHEDULING:
- evaluatorContext.getVisitor().enqueueEvaluation(skyKey);
+ // Low priority because this node is not needed by any other currently evaluating node.
+ // So keep it at the back of the queue as long as there's other useful work to be done.
+ evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MIN_VALUE);
break;
case DONE:
informProgressReceiverThatValueIsDone(skyKey, entry);
diff --git a/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java
index 04fcdeae91..1b1cd432c7 100644
--- a/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java
@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -53,6 +54,15 @@ import javax.annotation.Nullable;
* <p>This does not implement other parts of Skyframe evaluation setup and post-processing, such as
* translating a set of requested top-level nodes into actions, or constructing an evaluation
* result. Derived classes should do this.
+ *
+ * <p>Work is prioritized in a depth-first fashion when using the {@link
+ * #AbstractParallelEvaluator(ProcessableGraph, Version, ImmutableMap, ExtendedEventHandler,
+ * EmittedEventState, EventFilter, ErrorInfoManager, boolean, int, DirtyTrackingProgressReceiver,
+ * GraphInconsistencyReceiver, CycleDetector)} constructor, but not when using the {@link
+ * #AbstractParallelEvaluator(ProcessableGraph, Version, ImmutableMap, ExtendedEventHandler,
+ * EmittedEventState, EventFilter, ErrorInfoManager, boolean, DirtyTrackingProgressReceiver,
+ * GraphInconsistencyReceiver, ForkJoinPool, CycleDetector, EvaluationVersionBehavior)} (the
+ * constructor with a {@link ForkJoinPool}).
*/
public abstract class AbstractParallelEvaluator {
private static final Logger logger = Logger.getLogger(AbstractParallelEvaluator.class.getName());
@@ -60,6 +70,7 @@ public abstract class AbstractParallelEvaluator {
final ProcessableGraph graph;
final ParallelEvaluatorContext evaluatorContext;
protected final CycleDetector cycleDetector;
+ @Nullable private final AtomicInteger globalEnqueuedIndex;
AbstractParallelEvaluator(
ProcessableGraph graph,
@@ -87,9 +98,10 @@ public abstract class AbstractParallelEvaluator {
progressReceiver,
storedEventFilter,
errorInfoManager,
- Evaluate::new,
+ (skyKey, evaluationPriority) -> new Evaluate(evaluationPriority, skyKey),
graphInconsistencyReceiver,
threadCount);
+ this.globalEnqueuedIndex = new AtomicInteger(0);
}
AbstractParallelEvaluator(
@@ -119,10 +131,11 @@ public abstract class AbstractParallelEvaluator {
progressReceiver,
storedEventFilter,
errorInfoManager,
- Evaluate::new,
+ (skyKey, evaluationPriority) -> new Evaluate(0, skyKey),
graphInconsistencyReceiver,
Preconditions.checkNotNull(forkJoinPool),
evaluationVersionBehavior);
+ this.globalEnqueuedIndex = null;
}
/**
@@ -141,21 +154,40 @@ public abstract class AbstractParallelEvaluator {
NEEDS_EVALUATION
}
- /** An action that evaluates a value. */
- private class Evaluate implements Runnable {
+ /**
+ * An action that evaluates a value.
+ *
+ * <p>{@link Comparable} for use in priority queues. Experimentally, grouping enqueued evaluations
+ * together by parent leads to fewer in-flight evaluations and thus lower peak memory usage. Thus
+ * we store the {@link #evaluationPriority} (coming from the {@link #globalEnqueuedIndex} and use
+ * it for comparisons: later enqueuings should be evaluated earlier, to do a depth-first search,
+ * except for re-enqueued nodes, which always get top priority.
+ *
+ * <p>This is not applicable when using a {@link ForkJoinPool}, since it does not allow for easy
+ * work prioritization.
+ */
+ private class Evaluate implements ParallelEvaluatorContext.ComparableRunnable {
+ private final int evaluationPriority;
/** The name of the value to be evaluated. */
private final SkyKey skyKey;
- private Evaluate(SkyKey skyKey) {
+ private Evaluate(int evaluationPriority, SkyKey skyKey) {
+ this.evaluationPriority = evaluationPriority;
this.skyKey = skyKey;
}
+ @Override
+ public int compareTo(ParallelEvaluatorContext.ComparableRunnable o) {
+ return -1 * Integer.compare(this.evaluationPriority, ((Evaluate) o).evaluationPriority);
+ }
+
private void enqueueChild(
SkyKey skyKey,
NodeEntry entry,
SkyKey child,
NodeEntry childEntry,
- boolean depAlreadyExists)
+ boolean depAlreadyExists,
+ int childEvaluationPriority)
throws InterruptedException {
Preconditions.checkState(!entry.isDone(), "%s %s", skyKey, entry);
DependencyState dependencyState =
@@ -166,13 +198,15 @@ public abstract class AbstractParallelEvaluator {
case DONE:
if (entry.signalDep(childEntry.getVersion())) {
// This can only happen if there are no more children to be added.
- evaluatorContext.getVisitor().enqueueEvaluation(skyKey);
+ // Maximum priority, since this node has already started evaluation before, and we want
+ // it off our plate.
+ evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE);
}
break;
case ALREADY_EVALUATING:
break;
case NEEDS_SCHEDULING:
- evaluatorContext.getVisitor().enqueueEvaluation(child);
+ evaluatorContext.getVisitor().enqueueEvaluation(child, childEvaluationPriority);
break;
}
}
@@ -288,7 +322,10 @@ public abstract class AbstractParallelEvaluator {
unknownStatusDeps);
continue;
}
- handleKnownChildrenForDirtyNode(unknownStatusDeps, state);
+ handleKnownChildrenForDirtyNode(
+ unknownStatusDeps,
+ state,
+ globalEnqueuedIndex != null ? globalEnqueuedIndex.incrementAndGet() : 0);
return DirtyOutcome.ALREADY_PROCESSED;
}
switch (state.getDirtyState()) {
@@ -321,7 +358,8 @@ public abstract class AbstractParallelEvaluator {
}
}
- private void handleKnownChildrenForDirtyNode(Collection<SkyKey> knownChildren, NodeEntry state)
+ private void handleKnownChildrenForDirtyNode(
+ Collection<SkyKey> knownChildren, NodeEntry state, int childEvaluationPriority)
throws InterruptedException {
Map<SkyKey, ? extends NodeEntry> oldChildren =
graph.getBatch(skyKey, Reason.ENQUEUING_CHILD, knownChildren);
@@ -342,7 +380,8 @@ public abstract class AbstractParallelEvaluator {
state,
recreatedEntry.getKey(),
recreatedEntry.getValue(),
- /*depAlreadyExists=*/ false);
+ /*depAlreadyExists=*/ false,
+ childEvaluationPriority);
}
}
for (Map.Entry<SkyKey, ? extends NodeEntry> e : oldChildren.entrySet()) {
@@ -350,7 +389,13 @@ public abstract class AbstractParallelEvaluator {
NodeEntry directDepEntry = e.getValue();
// TODO(bazel-team): If this signals the current node and makes it ready, consider
// evaluating it in this thread instead of scheduling a new evaluation.
- enqueueChild(skyKey, state, directDep, directDepEntry, /*depAlreadyExists=*/ true);
+ enqueueChild(
+ skyKey,
+ state,
+ directDep,
+ directDepEntry,
+ /*depAlreadyExists=*/ true,
+ childEvaluationPriority);
}
}
@@ -381,7 +426,8 @@ public abstract class AbstractParallelEvaluator {
} catch (UndonePreviouslyRequestedDep undonePreviouslyRequestedDep) {
// If a previously requested dep is no longer done, restart this node from scratch.
restart(skyKey, state);
- evaluatorContext.getVisitor().enqueueEvaluation(skyKey);
+ // Top priority since this node has already been evaluating, so get it off our plate.
+ evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE);
return;
} finally {
evaluatorContext
@@ -487,7 +533,8 @@ public abstract class AbstractParallelEvaluator {
}
if (maybeHandleRestart(skyKey, state, value)) {
- evaluatorContext.getVisitor().enqueueEvaluation(skyKey);
+ // Top priority since this node has already been evaluating, so get it off our plate.
+ evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE);
return;
}
@@ -618,17 +665,26 @@ public abstract class AbstractParallelEvaluator {
Set<SkyKey> newDepsThatWereInTheLastEvaluation =
Sets.difference(uniqueNewDeps, newDepsThatWerentInTheLastEvaluation);
+ int childEvaluationPriority =
+ globalEnqueuedIndex != null ? globalEnqueuedIndex.incrementAndGet() : 0;
InterruptibleSupplier<Map<SkyKey, ? extends NodeEntry>>
newDepsThatWerentInTheLastEvaluationNodes =
graph.createIfAbsentBatchAsync(
skyKey, Reason.RDEP_ADDITION, newDepsThatWerentInTheLastEvaluation);
- handleKnownChildrenForDirtyNode(newDepsThatWereInTheLastEvaluation, state);
+ handleKnownChildrenForDirtyNode(
+ newDepsThatWereInTheLastEvaluation, state, childEvaluationPriority);
for (Map.Entry<SkyKey, ? extends NodeEntry> e :
newDepsThatWerentInTheLastEvaluationNodes.get().entrySet()) {
SkyKey newDirectDep = e.getKey();
NodeEntry newDirectDepEntry = e.getValue();
- enqueueChild(skyKey, state, newDirectDep, newDirectDepEntry, /*depAlreadyExists=*/ false);
+ enqueueChild(
+ skyKey,
+ state,
+ newDirectDep,
+ newDirectDepEntry,
+ /*depAlreadyExists=*/ false,
+ childEvaluationPriority);
}
// It is critical that there is no code below this point in the try block.
} catch (InterruptedException ie) {
@@ -824,7 +880,9 @@ public abstract class AbstractParallelEvaluator {
.noteInconsistencyAndMaybeThrow(
skyKey, depKey, Inconsistency.CHILD_UNDONE_FOR_BUILDING_NODE);
if (triState == DependencyState.NEEDS_SCHEDULING) {
- evaluatorContext.getVisitor().enqueueEvaluation(depKey);
+ // Top priority since this depKey was already evaluated before, and we want to finish it off
+ // again, reducing the chance that another node may observe this dep to be undone.
+ evaluatorContext.getVisitor().enqueueEvaluation(depKey, Integer.MAX_VALUE);
}
return true;
}
diff --git a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java
index 3577230e3a..27e6b61ceb 100644
--- a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java
@@ -13,14 +13,9 @@
// limitations under the License.
package com.google.devtools.build.skyframe;
-import com.google.common.base.Function;
-import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
-import com.google.devtools.build.lib.concurrent.ErrorHandler;
-import com.google.devtools.build.lib.concurrent.ExecutorParams;
import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DeletingNodeVisitor;
import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DirtyingNodeVisitor;
import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.InvalidationState;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import javax.annotation.Nullable;
@@ -73,11 +68,9 @@ public final class EagerInvalidator {
QueryableGraph graph,
Iterable<SkyKey> diff,
DirtyTrackingProgressReceiver progressReceiver,
- InvalidationState state,
- Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
+ InvalidationState state) {
state.update(diff);
- return state.isEmpty() ? null
- : new DirtyingNodeVisitor(graph, progressReceiver, state, executorFactory);
+ return state.isEmpty() ? null : new DirtyingNodeVisitor(graph, progressReceiver, state);
}
@Nullable
@@ -87,8 +80,7 @@ public final class EagerInvalidator {
DirtyTrackingProgressReceiver progressReceiver,
InvalidationState state,
ForkJoinPool forkJoinPool,
- boolean supportInterruptions,
- ErrorHandler errorHandler) {
+ boolean supportInterruptions) {
state.update(diff);
return state.isEmpty()
? null
@@ -99,20 +91,15 @@ public final class EagerInvalidator {
forkJoinPool,
supportInterruptions);
}
- /**
- * Invalidates given values and their upward transitive closure in the graph if necessary, using
- * an executor constructed with the provided factory.
- */
+ /** Invalidates given values and their upward transitive closure in the graph if necessary. */
public static void invalidate(
QueryableGraph graph,
Iterable<SkyKey> diff,
DirtyTrackingProgressReceiver progressReceiver,
- InvalidationState state,
- Function<ExecutorParams, ? extends ExecutorService> executorFactory)
+ InvalidationState state)
throws InterruptedException {
DirtyingNodeVisitor visitor =
- createInvalidatingVisitorIfNeeded(
- graph, diff, progressReceiver, state, executorFactory);
+ createInvalidatingVisitorIfNeeded(graph, diff, progressReceiver, state);
if (visitor != null) {
visitor.run();
}
@@ -132,26 +119,9 @@ public final class EagerInvalidator {
throws InterruptedException {
DirtyingNodeVisitor visitor =
createInvalidatingVisitorIfNeeded(
- graph,
- diff,
- progressReceiver,
- state,
- forkJoinPool,
- supportInterruptions,
- ErrorHandler.NullHandler.INSTANCE);
+ graph, diff, progressReceiver, state, forkJoinPool, supportInterruptions);
if (visitor != null) {
visitor.run();
}
}
-
- /** Invalidates given values and their upward transitive closure in the graph. */
- public static void invalidate(
- QueryableGraph graph,
- Iterable<SkyKey> diff,
- DirtyTrackingProgressReceiver progressReceiver,
- InvalidationState state)
- throws InterruptedException {
- invalidate(graph, diff, progressReceiver, state, AbstractQueueVisitor.EXECUTOR_FACTORY);
- }
-
}
diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
index 168ad23b95..ae9af7705e 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
import com.google.devtools.build.lib.concurrent.ErrorClassifier;
-import com.google.devtools.build.lib.concurrent.ExecutorParams;
import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor;
import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
@@ -35,7 +34,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -88,15 +86,6 @@ public abstract class InvalidatingNodeVisitor<TGraph extends QueryableGraph> {
TGraph graph,
DirtyTrackingProgressReceiver progressReceiver,
InvalidationState state) {
- this(
- graph, progressReceiver, state, AbstractQueueVisitor.EXECUTOR_FACTORY);
- }
-
- protected InvalidatingNodeVisitor(
- TGraph graph,
- DirtyTrackingProgressReceiver progressReceiver,
- InvalidationState state,
- Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
this.executor =
new AbstractQueueVisitor(
/*parallelism=*/ DEFAULT_THREAD_COUNT,
@@ -104,7 +93,6 @@ public abstract class InvalidatingNodeVisitor<TGraph extends QueryableGraph> {
/*units=*/ TimeUnit.SECONDS,
/*failFastOnException=*/ true,
"skyframe-invalidator",
- executorFactory,
errorClassifier);
this.graph = Preconditions.checkNotNull(graph);
this.progressReceiver = Preconditions.checkNotNull(progressReceiver);
@@ -357,9 +345,8 @@ public abstract class InvalidatingNodeVisitor<TGraph extends QueryableGraph> {
protected DirtyingNodeVisitor(
QueryableGraph graph,
DirtyTrackingProgressReceiver progressReceiver,
- InvalidationState state,
- Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
- super(graph, progressReceiver, state, executorFactory);
+ InvalidationState state) {
+ super(graph, progressReceiver, state);
this.supportInterruptions = true;
}
diff --git a/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
index 5d1cd5f26b..46672db153 100644
--- a/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
+++ b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
@@ -14,12 +14,12 @@
package com.google.devtools.build.skyframe;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.collect.Sets;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
import com.google.devtools.build.lib.concurrent.ErrorClassifier;
import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor;
import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
+import com.google.devtools.build.skyframe.ParallelEvaluatorContext.RunnableMaker;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -58,12 +58,12 @@ class NodeEntryVisitor {
* Function that allows this visitor to execute the appropriate {@link Runnable} when given a
* {@link SkyKey} to evaluate.
*/
- private final Function<SkyKey, Runnable> runnableMaker;
+ private final RunnableMaker runnableMaker;
NodeEntryVisitor(
ForkJoinPool forkJoinPool,
DirtyTrackingProgressReceiver progressReceiver,
- Function<SkyKey, Runnable> runnableMaker) {
+ RunnableMaker runnableMaker) {
this.quiescingExecutor = ForkJoinQuiescingExecutor.newBuilder()
.withOwnershipOf(forkJoinPool)
.setErrorClassifier(NODE_ENTRY_VISITOR_ERROR_CLASSIFIER)
@@ -75,15 +75,14 @@ class NodeEntryVisitor {
NodeEntryVisitor(
int threadCount,
DirtyTrackingProgressReceiver progressReceiver,
- Function<SkyKey, Runnable> runnableMaker) {
+ RunnableMaker runnableMaker) {
quiescingExecutor =
- new AbstractQueueVisitor(
+ AbstractQueueVisitor.createWithPriorityQueue(
threadCount,
/*keepAliveTime=*/ 1,
TimeUnit.SECONDS,
/*failFastOnException*/ true,
"skyframe-evaluator",
- AbstractQueueVisitor.EXECUTOR_FACTORY,
NODE_ENTRY_VISITOR_ERROR_CLASSIFIER);
this.progressReceiver = progressReceiver;
this.runnableMaker = runnableMaker;
@@ -93,7 +92,23 @@ class NodeEntryVisitor {
quiescingExecutor.awaitQuiescence(/*interruptWorkers=*/ true);
}
- void enqueueEvaluation(SkyKey key) {
+ /**
+ * Enqueue {@code key} for evaluation, at {@code evaluationPriority} if this visitor is using a
+ * priority queue.
+ *
+ * <p>{@code evaluationPriority} is used to minimize evaluation "sprawl": inefficiencies coming
+ * from incompletely evaluating many nodes, versus focusing on finishing the evaluation of nodes
+ * that have already started evaluating. Sprawl can be expensive because an incompletely evaluated
+ * node keeps state in Skyframe, and often in external caches, that uses memory.
+ *
+ * <p>In general, {@code evaluationPriority} should be maximal ({@link Integer#MAX_VALUE}) when
+ * restarting a node that has already started evaluation, and minimal when enqueueing a node that
+ * no other tasks depend on. Setting {@code evaluationPriority} to the same value for all children
+ * of a parent has good results experimentally, since it prioritizes batches of work that can be
+ * used together. Similarly, prioritizing deeper nodes (depth-first search of the evaluation
+ * graph) also has good results experimentally, since it minimizes sprawl.
+ */
+ void enqueueEvaluation(SkyKey key, int evaluationPriority) {
if (preventNewEvaluations.get()) {
// If an error happens in nokeep_going mode, we still want to mark these nodes as inflight,
// otherwise cleanup will not happen properly.
@@ -101,7 +116,7 @@ class NodeEntryVisitor {
return;
}
progressReceiver.enqueueing(key);
- quiescingExecutor.execute(runnableMaker.apply(key));
+ quiescingExecutor.execute(runnableMaker.make(key, evaluationPriority));
}
/**
diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java
index 60987ac7f2..db2afd10e5 100644
--- a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java
+++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java
@@ -13,7 +13,6 @@
// limitations under the License.
package com.google.devtools.build.skyframe;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
@@ -73,7 +72,7 @@ class ParallelEvaluatorContext {
final DirtyTrackingProgressReceiver progressReceiver,
EventFilter storedEventFilter,
ErrorInfoManager errorInfoManager,
- final Function<SkyKey, Runnable> runnableMaker,
+ RunnableMaker runnableMaker,
GraphInconsistencyReceiver graphInconsistencyReceiver,
final int threadCount) {
this(
@@ -101,7 +100,7 @@ class ParallelEvaluatorContext {
final DirtyTrackingProgressReceiver progressReceiver,
EventFilter storedEventFilter,
ErrorInfoManager errorInfoManager,
- final Function<SkyKey, Runnable> runnableMaker,
+ RunnableMaker runnableMaker,
GraphInconsistencyReceiver graphInconsistencyReceiver,
final ForkJoinPool forkJoinPool,
EvaluationVersionBehavior evaluationVersionBehavior) {
@@ -120,6 +119,18 @@ class ParallelEvaluatorContext {
evaluationVersionBehavior);
}
+ /**
+ * Returns a {@link Runnable} given a {@code key} to evaluate and an {@code evaluationPriority}
+ * indicating whether it should be scheduled for evaluation soon (higher is better). The returned
+ * {@link Runnable} is a {@link ComparableRunnable} so that it can be ordered by {@code
+ * evaluationPriority} in a priority queue if needed.
+ */
+ interface RunnableMaker {
+ ComparableRunnable make(SkyKey key, int evaluationPriority);
+ }
+
+ interface ComparableRunnable extends Runnable, Comparable<ComparableRunnable> {}
+
private ParallelEvaluatorContext(
QueryableGraph graph,
Version graphVersion,
@@ -174,7 +185,7 @@ class ParallelEvaluatorContext {
for (SkyKey key : keys) {
NodeEntry entry = Preconditions.checkNotNull(batch.get(key), key);
if (entry.signalDep(version)) {
- getVisitor().enqueueEvaluation(key);
+ getVisitor().enqueueEvaluation(key, Integer.MAX_VALUE);
}
}
return;
diff --git a/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java b/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java
index d48cc5d2ec..7a3aa5f312 100644
--- a/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java
+++ b/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java
@@ -137,7 +137,6 @@ public class ConcurrentMultimapWithHeadElementTest {
TimeUnit.SECONDS,
/*failFastOnException=*/ true,
"action-graph-test",
- AbstractQueueVisitor.EXECUTOR_FACTORY,
ErrorClassifier.DEFAULT);
}
diff --git a/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java b/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java
index 5de14d956c..5ec0d4a58f 100644
--- a/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java
+++ b/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java
@@ -89,7 +89,6 @@ public class MapBasedActionGraphTest {
TimeUnit.SECONDS,
/*failFastOnException=*/ true,
"action-graph-test",
- AbstractQueueVisitor.EXECUTOR_FACTORY,
ErrorClassifier.DEFAULT);
Path execRoot = fileSystem.getPath("/");
Path root = fileSystem.getPath("/root");
diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java
index 070011b0d9..79ab720789 100644
--- a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java
@@ -564,7 +564,6 @@ public class AbstractQueueVisitorTest {
TimeUnit.SECONDS,
/* failFastOnException= */ false,
THREAD_NAME,
- AbstractQueueVisitor.EXECUTOR_FACTORY,
ErrorClassifier.DEFAULT);
}
@@ -603,7 +602,6 @@ public class AbstractQueueVisitorTest {
TimeUnit.SECONDS,
/* failFastOnException= */ false,
THREAD_NAME,
- AbstractQueueVisitor.EXECUTOR_FACTORY,
ErrorClassifier.DEFAULT);
}
@@ -614,7 +612,6 @@ public class AbstractQueueVisitorTest {
TimeUnit.SECONDS,
failFast,
THREAD_NAME,
- AbstractQueueVisitor.EXECUTOR_FACTORY,
ErrorClassifier.DEFAULT);
}
diff --git a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
index 223b14b37c..6239319247 100644
--- a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
+++ b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.eventbus.EventBus;
import com.google.common.testing.GcFinalization;
-import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.testutil.TestUtils;
import com.google.devtools.build.lib.util.Pair;
@@ -562,8 +561,7 @@ public class EagerInvalidatorTest {
ImmutableList<SkyKey> diff = ImmutableList.of(GraphTester.nonHermeticKey("a"));
InvalidationState state1 = new DirtyingInvalidationState();
Preconditions.checkNotNull(
- EagerInvalidator.createInvalidatingVisitorIfNeeded(
- graph, diff, receiver, state1, AbstractQueueVisitor.EXECUTOR_FACTORY))
+ EagerInvalidator.createInvalidatingVisitorIfNeeded(graph, diff, receiver, state1))
.run();
assertThat(receiver.getUnenqueuedDirtyKeys()).containsExactly(diff.get(0), skyKey("ab"));
@@ -585,12 +583,7 @@ public class EagerInvalidatorTest {
throws InterruptedException {
Iterable<SkyKey> diff = ImmutableList.copyOf(keys);
DirtyingNodeVisitor dirtyingNodeVisitor =
- EagerInvalidator.createInvalidatingVisitorIfNeeded(
- graph,
- diff,
- progressReceiver,
- state,
- AbstractQueueVisitor.EXECUTOR_FACTORY);
+ EagerInvalidator.createInvalidatingVisitorIfNeeded(graph, diff, progressReceiver, state);
if (dirtyingNodeVisitor != null) {
visitor.set(dirtyingNodeVisitor);
dirtyingNodeVisitor.run();