aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/skyframe
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/skyframe')
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java4
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java92
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java44
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java17
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java31
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java19
6 files changed, 125 insertions, 82 deletions
diff --git a/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java
index f8f21f254e..268db07014 100644
--- a/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/AbstractExceptionalParallelEvaluator.java
@@ -249,7 +249,9 @@ public abstract class AbstractExceptionalParallelEvaluator<E extends Exception>
// This must be equivalent to the code in enqueueChild above, in order to be thread-safe.
switch (entry.addReverseDepAndCheckIfDone(null)) {
case NEEDS_SCHEDULING:
- evaluatorContext.getVisitor().enqueueEvaluation(skyKey);
+ // Low priority because this node is not needed by any other currently evaluating node.
+ // So keep it at the back of the queue as long as there's other useful work to be done.
+ evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MIN_VALUE);
break;
case DONE:
informProgressReceiverThatValueIsDone(skyKey, entry);
diff --git a/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java
index 04fcdeae91..1b1cd432c7 100644
--- a/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/AbstractParallelEvaluator.java
@@ -43,6 +43,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -53,6 +54,15 @@ import javax.annotation.Nullable;
* <p>This does not implement other parts of Skyframe evaluation setup and post-processing, such as
* translating a set of requested top-level nodes into actions, or constructing an evaluation
* result. Derived classes should do this.
+ *
+ * <p>Work is prioritized in a depth-first fashion when using the {@link
+ * #AbstractParallelEvaluator(ProcessableGraph, Version, ImmutableMap, ExtendedEventHandler,
+ * EmittedEventState, EventFilter, ErrorInfoManager, boolean, int, DirtyTrackingProgressReceiver,
+ * GraphInconsistencyReceiver, CycleDetector)} constructor, but not when using the {@link
+ * #AbstractParallelEvaluator(ProcessableGraph, Version, ImmutableMap, ExtendedEventHandler,
+ * EmittedEventState, EventFilter, ErrorInfoManager, boolean, DirtyTrackingProgressReceiver,
+ * GraphInconsistencyReceiver, ForkJoinPool, CycleDetector, EvaluationVersionBehavior)} (the
+ * constructor with a {@link ForkJoinPool}).
*/
public abstract class AbstractParallelEvaluator {
private static final Logger logger = Logger.getLogger(AbstractParallelEvaluator.class.getName());
@@ -60,6 +70,7 @@ public abstract class AbstractParallelEvaluator {
final ProcessableGraph graph;
final ParallelEvaluatorContext evaluatorContext;
protected final CycleDetector cycleDetector;
+ @Nullable private final AtomicInteger globalEnqueuedIndex;
AbstractParallelEvaluator(
ProcessableGraph graph,
@@ -87,9 +98,10 @@ public abstract class AbstractParallelEvaluator {
progressReceiver,
storedEventFilter,
errorInfoManager,
- Evaluate::new,
+ (skyKey, evaluationPriority) -> new Evaluate(evaluationPriority, skyKey),
graphInconsistencyReceiver,
threadCount);
+ this.globalEnqueuedIndex = new AtomicInteger(0);
}
AbstractParallelEvaluator(
@@ -119,10 +131,11 @@ public abstract class AbstractParallelEvaluator {
progressReceiver,
storedEventFilter,
errorInfoManager,
- Evaluate::new,
+ (skyKey, evaluationPriority) -> new Evaluate(0, skyKey),
graphInconsistencyReceiver,
Preconditions.checkNotNull(forkJoinPool),
evaluationVersionBehavior);
+ this.globalEnqueuedIndex = null;
}
/**
@@ -141,21 +154,40 @@ public abstract class AbstractParallelEvaluator {
NEEDS_EVALUATION
}
- /** An action that evaluates a value. */
- private class Evaluate implements Runnable {
+ /**
+ * An action that evaluates a value.
+ *
+ * <p>{@link Comparable} for use in priority queues. Experimentally, grouping enqueued evaluations
+ * together by parent leads to fewer in-flight evaluations and thus lower peak memory usage. Thus
+ * we store the {@link #evaluationPriority} (coming from the {@link #globalEnqueuedIndex} and use
+ * it for comparisons: later enqueuings should be evaluated earlier, to do a depth-first search,
+ * except for re-enqueued nodes, which always get top priority.
+ *
+ * <p>This is not applicable when using a {@link ForkJoinPool}, since it does not allow for easy
+ * work prioritization.
+ */
+ private class Evaluate implements ParallelEvaluatorContext.ComparableRunnable {
+ private final int evaluationPriority;
/** The name of the value to be evaluated. */
private final SkyKey skyKey;
- private Evaluate(SkyKey skyKey) {
+ private Evaluate(int evaluationPriority, SkyKey skyKey) {
+ this.evaluationPriority = evaluationPriority;
this.skyKey = skyKey;
}
+ @Override
+ public int compareTo(ParallelEvaluatorContext.ComparableRunnable o) {
+ return -1 * Integer.compare(this.evaluationPriority, ((Evaluate) o).evaluationPriority);
+ }
+
private void enqueueChild(
SkyKey skyKey,
NodeEntry entry,
SkyKey child,
NodeEntry childEntry,
- boolean depAlreadyExists)
+ boolean depAlreadyExists,
+ int childEvaluationPriority)
throws InterruptedException {
Preconditions.checkState(!entry.isDone(), "%s %s", skyKey, entry);
DependencyState dependencyState =
@@ -166,13 +198,15 @@ public abstract class AbstractParallelEvaluator {
case DONE:
if (entry.signalDep(childEntry.getVersion())) {
// This can only happen if there are no more children to be added.
- evaluatorContext.getVisitor().enqueueEvaluation(skyKey);
+ // Maximum priority, since this node has already started evaluation before, and we want
+ // it off our plate.
+ evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE);
}
break;
case ALREADY_EVALUATING:
break;
case NEEDS_SCHEDULING:
- evaluatorContext.getVisitor().enqueueEvaluation(child);
+ evaluatorContext.getVisitor().enqueueEvaluation(child, childEvaluationPriority);
break;
}
}
@@ -288,7 +322,10 @@ public abstract class AbstractParallelEvaluator {
unknownStatusDeps);
continue;
}
- handleKnownChildrenForDirtyNode(unknownStatusDeps, state);
+ handleKnownChildrenForDirtyNode(
+ unknownStatusDeps,
+ state,
+ globalEnqueuedIndex != null ? globalEnqueuedIndex.incrementAndGet() : 0);
return DirtyOutcome.ALREADY_PROCESSED;
}
switch (state.getDirtyState()) {
@@ -321,7 +358,8 @@ public abstract class AbstractParallelEvaluator {
}
}
- private void handleKnownChildrenForDirtyNode(Collection<SkyKey> knownChildren, NodeEntry state)
+ private void handleKnownChildrenForDirtyNode(
+ Collection<SkyKey> knownChildren, NodeEntry state, int childEvaluationPriority)
throws InterruptedException {
Map<SkyKey, ? extends NodeEntry> oldChildren =
graph.getBatch(skyKey, Reason.ENQUEUING_CHILD, knownChildren);
@@ -342,7 +380,8 @@ public abstract class AbstractParallelEvaluator {
state,
recreatedEntry.getKey(),
recreatedEntry.getValue(),
- /*depAlreadyExists=*/ false);
+ /*depAlreadyExists=*/ false,
+ childEvaluationPriority);
}
}
for (Map.Entry<SkyKey, ? extends NodeEntry> e : oldChildren.entrySet()) {
@@ -350,7 +389,13 @@ public abstract class AbstractParallelEvaluator {
NodeEntry directDepEntry = e.getValue();
// TODO(bazel-team): If this signals the current node and makes it ready, consider
// evaluating it in this thread instead of scheduling a new evaluation.
- enqueueChild(skyKey, state, directDep, directDepEntry, /*depAlreadyExists=*/ true);
+ enqueueChild(
+ skyKey,
+ state,
+ directDep,
+ directDepEntry,
+ /*depAlreadyExists=*/ true,
+ childEvaluationPriority);
}
}
@@ -381,7 +426,8 @@ public abstract class AbstractParallelEvaluator {
} catch (UndonePreviouslyRequestedDep undonePreviouslyRequestedDep) {
// If a previously requested dep is no longer done, restart this node from scratch.
restart(skyKey, state);
- evaluatorContext.getVisitor().enqueueEvaluation(skyKey);
+ // Top priority since this node has already been evaluating, so get it off our plate.
+ evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE);
return;
} finally {
evaluatorContext
@@ -487,7 +533,8 @@ public abstract class AbstractParallelEvaluator {
}
if (maybeHandleRestart(skyKey, state, value)) {
- evaluatorContext.getVisitor().enqueueEvaluation(skyKey);
+ // Top priority since this node has already been evaluating, so get it off our plate.
+ evaluatorContext.getVisitor().enqueueEvaluation(skyKey, Integer.MAX_VALUE);
return;
}
@@ -618,17 +665,26 @@ public abstract class AbstractParallelEvaluator {
Set<SkyKey> newDepsThatWereInTheLastEvaluation =
Sets.difference(uniqueNewDeps, newDepsThatWerentInTheLastEvaluation);
+ int childEvaluationPriority =
+ globalEnqueuedIndex != null ? globalEnqueuedIndex.incrementAndGet() : 0;
InterruptibleSupplier<Map<SkyKey, ? extends NodeEntry>>
newDepsThatWerentInTheLastEvaluationNodes =
graph.createIfAbsentBatchAsync(
skyKey, Reason.RDEP_ADDITION, newDepsThatWerentInTheLastEvaluation);
- handleKnownChildrenForDirtyNode(newDepsThatWereInTheLastEvaluation, state);
+ handleKnownChildrenForDirtyNode(
+ newDepsThatWereInTheLastEvaluation, state, childEvaluationPriority);
for (Map.Entry<SkyKey, ? extends NodeEntry> e :
newDepsThatWerentInTheLastEvaluationNodes.get().entrySet()) {
SkyKey newDirectDep = e.getKey();
NodeEntry newDirectDepEntry = e.getValue();
- enqueueChild(skyKey, state, newDirectDep, newDirectDepEntry, /*depAlreadyExists=*/ false);
+ enqueueChild(
+ skyKey,
+ state,
+ newDirectDep,
+ newDirectDepEntry,
+ /*depAlreadyExists=*/ false,
+ childEvaluationPriority);
}
// It is critical that there is no code below this point in the try block.
} catch (InterruptedException ie) {
@@ -824,7 +880,9 @@ public abstract class AbstractParallelEvaluator {
.noteInconsistencyAndMaybeThrow(
skyKey, depKey, Inconsistency.CHILD_UNDONE_FOR_BUILDING_NODE);
if (triState == DependencyState.NEEDS_SCHEDULING) {
- evaluatorContext.getVisitor().enqueueEvaluation(depKey);
+ // Top priority since this depKey was already evaluated before, and we want to finish it off
+ // again, reducing the chance that another node may observe this dep to be undone.
+ evaluatorContext.getVisitor().enqueueEvaluation(depKey, Integer.MAX_VALUE);
}
return true;
}
diff --git a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java
index 3577230e3a..27e6b61ceb 100644
--- a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java
@@ -13,14 +13,9 @@
// limitations under the License.
package com.google.devtools.build.skyframe;
-import com.google.common.base.Function;
-import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
-import com.google.devtools.build.lib.concurrent.ErrorHandler;
-import com.google.devtools.build.lib.concurrent.ExecutorParams;
import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DeletingNodeVisitor;
import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DirtyingNodeVisitor;
import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.InvalidationState;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import javax.annotation.Nullable;
@@ -73,11 +68,9 @@ public final class EagerInvalidator {
QueryableGraph graph,
Iterable<SkyKey> diff,
DirtyTrackingProgressReceiver progressReceiver,
- InvalidationState state,
- Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
+ InvalidationState state) {
state.update(diff);
- return state.isEmpty() ? null
- : new DirtyingNodeVisitor(graph, progressReceiver, state, executorFactory);
+ return state.isEmpty() ? null : new DirtyingNodeVisitor(graph, progressReceiver, state);
}
@Nullable
@@ -87,8 +80,7 @@ public final class EagerInvalidator {
DirtyTrackingProgressReceiver progressReceiver,
InvalidationState state,
ForkJoinPool forkJoinPool,
- boolean supportInterruptions,
- ErrorHandler errorHandler) {
+ boolean supportInterruptions) {
state.update(diff);
return state.isEmpty()
? null
@@ -99,20 +91,15 @@ public final class EagerInvalidator {
forkJoinPool,
supportInterruptions);
}
- /**
- * Invalidates given values and their upward transitive closure in the graph if necessary, using
- * an executor constructed with the provided factory.
- */
+ /** Invalidates given values and their upward transitive closure in the graph if necessary. */
public static void invalidate(
QueryableGraph graph,
Iterable<SkyKey> diff,
DirtyTrackingProgressReceiver progressReceiver,
- InvalidationState state,
- Function<ExecutorParams, ? extends ExecutorService> executorFactory)
+ InvalidationState state)
throws InterruptedException {
DirtyingNodeVisitor visitor =
- createInvalidatingVisitorIfNeeded(
- graph, diff, progressReceiver, state, executorFactory);
+ createInvalidatingVisitorIfNeeded(graph, diff, progressReceiver, state);
if (visitor != null) {
visitor.run();
}
@@ -132,26 +119,9 @@ public final class EagerInvalidator {
throws InterruptedException {
DirtyingNodeVisitor visitor =
createInvalidatingVisitorIfNeeded(
- graph,
- diff,
- progressReceiver,
- state,
- forkJoinPool,
- supportInterruptions,
- ErrorHandler.NullHandler.INSTANCE);
+ graph, diff, progressReceiver, state, forkJoinPool, supportInterruptions);
if (visitor != null) {
visitor.run();
}
}
-
- /** Invalidates given values and their upward transitive closure in the graph. */
- public static void invalidate(
- QueryableGraph graph,
- Iterable<SkyKey> diff,
- DirtyTrackingProgressReceiver progressReceiver,
- InvalidationState state)
- throws InterruptedException {
- invalidate(graph, diff, progressReceiver, state, AbstractQueueVisitor.EXECUTOR_FACTORY);
- }
-
}
diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
index 168ad23b95..ae9af7705e 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
import com.google.devtools.build.lib.concurrent.ErrorClassifier;
-import com.google.devtools.build.lib.concurrent.ExecutorParams;
import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor;
import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
@@ -35,7 +34,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -88,15 +86,6 @@ public abstract class InvalidatingNodeVisitor<TGraph extends QueryableGraph> {
TGraph graph,
DirtyTrackingProgressReceiver progressReceiver,
InvalidationState state) {
- this(
- graph, progressReceiver, state, AbstractQueueVisitor.EXECUTOR_FACTORY);
- }
-
- protected InvalidatingNodeVisitor(
- TGraph graph,
- DirtyTrackingProgressReceiver progressReceiver,
- InvalidationState state,
- Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
this.executor =
new AbstractQueueVisitor(
/*parallelism=*/ DEFAULT_THREAD_COUNT,
@@ -104,7 +93,6 @@ public abstract class InvalidatingNodeVisitor<TGraph extends QueryableGraph> {
/*units=*/ TimeUnit.SECONDS,
/*failFastOnException=*/ true,
"skyframe-invalidator",
- executorFactory,
errorClassifier);
this.graph = Preconditions.checkNotNull(graph);
this.progressReceiver = Preconditions.checkNotNull(progressReceiver);
@@ -357,9 +345,8 @@ public abstract class InvalidatingNodeVisitor<TGraph extends QueryableGraph> {
protected DirtyingNodeVisitor(
QueryableGraph graph,
DirtyTrackingProgressReceiver progressReceiver,
- InvalidationState state,
- Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
- super(graph, progressReceiver, state, executorFactory);
+ InvalidationState state) {
+ super(graph, progressReceiver, state);
this.supportInterruptions = true;
}
diff --git a/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
index 5d1cd5f26b..46672db153 100644
--- a/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
+++ b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
@@ -14,12 +14,12 @@
package com.google.devtools.build.skyframe;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.collect.Sets;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
import com.google.devtools.build.lib.concurrent.ErrorClassifier;
import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor;
import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
+import com.google.devtools.build.skyframe.ParallelEvaluatorContext.RunnableMaker;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@@ -58,12 +58,12 @@ class NodeEntryVisitor {
* Function that allows this visitor to execute the appropriate {@link Runnable} when given a
* {@link SkyKey} to evaluate.
*/
- private final Function<SkyKey, Runnable> runnableMaker;
+ private final RunnableMaker runnableMaker;
NodeEntryVisitor(
ForkJoinPool forkJoinPool,
DirtyTrackingProgressReceiver progressReceiver,
- Function<SkyKey, Runnable> runnableMaker) {
+ RunnableMaker runnableMaker) {
this.quiescingExecutor = ForkJoinQuiescingExecutor.newBuilder()
.withOwnershipOf(forkJoinPool)
.setErrorClassifier(NODE_ENTRY_VISITOR_ERROR_CLASSIFIER)
@@ -75,15 +75,14 @@ class NodeEntryVisitor {
NodeEntryVisitor(
int threadCount,
DirtyTrackingProgressReceiver progressReceiver,
- Function<SkyKey, Runnable> runnableMaker) {
+ RunnableMaker runnableMaker) {
quiescingExecutor =
- new AbstractQueueVisitor(
+ AbstractQueueVisitor.createWithPriorityQueue(
threadCount,
/*keepAliveTime=*/ 1,
TimeUnit.SECONDS,
/*failFastOnException*/ true,
"skyframe-evaluator",
- AbstractQueueVisitor.EXECUTOR_FACTORY,
NODE_ENTRY_VISITOR_ERROR_CLASSIFIER);
this.progressReceiver = progressReceiver;
this.runnableMaker = runnableMaker;
@@ -93,7 +92,23 @@ class NodeEntryVisitor {
quiescingExecutor.awaitQuiescence(/*interruptWorkers=*/ true);
}
- void enqueueEvaluation(SkyKey key) {
+ /**
+ * Enqueue {@code key} for evaluation, at {@code evaluationPriority} if this visitor is using a
+ * priority queue.
+ *
+ * <p>{@code evaluationPriority} is used to minimize evaluation "sprawl": inefficiencies coming
+ * from incompletely evaluating many nodes, versus focusing on finishing the evaluation of nodes
+ * that have already started evaluating. Sprawl can be expensive because an incompletely evaluated
+ * node keeps state in Skyframe, and often in external caches, that uses memory.
+ *
+ * <p>In general, {@code evaluationPriority} should be maximal ({@link Integer#MAX_VALUE}) when
+ * restarting a node that has already started evaluation, and minimal when enqueueing a node that
+ * no other tasks depend on. Setting {@code evaluationPriority} to the same value for all children
+ * of a parent has good results experimentally, since it prioritizes batches of work that can be
+ * used together. Similarly, prioritizing deeper nodes (depth-first search of the evaluation
+ * graph) also has good results experimentally, since it minimizes sprawl.
+ */
+ void enqueueEvaluation(SkyKey key, int evaluationPriority) {
if (preventNewEvaluations.get()) {
// If an error happens in nokeep_going mode, we still want to mark these nodes as inflight,
// otherwise cleanup will not happen properly.
@@ -101,7 +116,7 @@ class NodeEntryVisitor {
return;
}
progressReceiver.enqueueing(key);
- quiescingExecutor.execute(runnableMaker.apply(key));
+ quiescingExecutor.execute(runnableMaker.make(key, evaluationPriority));
}
/**
diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java
index 60987ac7f2..db2afd10e5 100644
--- a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java
+++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java
@@ -13,7 +13,6 @@
// limitations under the License.
package com.google.devtools.build.skyframe;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
@@ -73,7 +72,7 @@ class ParallelEvaluatorContext {
final DirtyTrackingProgressReceiver progressReceiver,
EventFilter storedEventFilter,
ErrorInfoManager errorInfoManager,
- final Function<SkyKey, Runnable> runnableMaker,
+ RunnableMaker runnableMaker,
GraphInconsistencyReceiver graphInconsistencyReceiver,
final int threadCount) {
this(
@@ -101,7 +100,7 @@ class ParallelEvaluatorContext {
final DirtyTrackingProgressReceiver progressReceiver,
EventFilter storedEventFilter,
ErrorInfoManager errorInfoManager,
- final Function<SkyKey, Runnable> runnableMaker,
+ RunnableMaker runnableMaker,
GraphInconsistencyReceiver graphInconsistencyReceiver,
final ForkJoinPool forkJoinPool,
EvaluationVersionBehavior evaluationVersionBehavior) {
@@ -120,6 +119,18 @@ class ParallelEvaluatorContext {
evaluationVersionBehavior);
}
+ /**
+ * Returns a {@link Runnable} given a {@code key} to evaluate and an {@code evaluationPriority}
+ * indicating whether it should be scheduled for evaluation soon (higher is better). The returned
+ * {@link Runnable} is a {@link ComparableRunnable} so that it can be ordered by {@code
+ * evaluationPriority} in a priority queue if needed.
+ */
+ interface RunnableMaker {
+ ComparableRunnable make(SkyKey key, int evaluationPriority);
+ }
+
+ interface ComparableRunnable extends Runnable, Comparable<ComparableRunnable> {}
+
private ParallelEvaluatorContext(
QueryableGraph graph,
Version graphVersion,
@@ -174,7 +185,7 @@ class ParallelEvaluatorContext {
for (SkyKey key : keys) {
NodeEntry entry = Preconditions.checkNotNull(batch.get(key), key);
if (entry.signalDep(version)) {
- getVisitor().enqueueEvaluation(key);
+ getVisitor().enqueueEvaluation(key, Integer.MAX_VALUE);
}
}
return;