aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Janak Ramakrishnan <janakr@google.com>2016-08-25 22:42:37 +0000
committerGravatar John Cater <jcater@google.com>2016-08-26 18:39:02 +0000
commitb449e3fa913a99c76b22f3301d1ec29c12b9e2f9 (patch)
tree96816087422822279870642a4bc64111524a2f17 /src
parent7576acc4702962af9143e49a6e0da89490449ca2 (diff)
Refactor ParallelEvaluator in preparation for making it more modular with respect to cycle checking.
Reducing the size of ParallelEvaluator.java is also probably long overdue. I believe this change stands on its own, but if you don't think the third change is worth it, and this isn't worth it on its own, feel free to push back. -- MOS_MIGRATED_REVID=131340165
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/EventFilter.java26
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/InMemoryMemoizingEvaluator.java1
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java154
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java1069
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java216
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/SkyFunctionEnvironment.java609
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/SkyValueSupplier.java42
-rw-r--r--src/test/java/com/google/devtools/build/skyframe/ChainedFunction.java3
-rw-r--r--src/test/java/com/google/devtools/build/skyframe/MemoizingEvaluatorTest.java2
-rw-r--r--src/test/java/com/google/devtools/build/skyframe/ParallelEvaluatorTest.java1
10 files changed, 1234 insertions, 889 deletions
diff --git a/src/main/java/com/google/devtools/build/skyframe/EventFilter.java b/src/main/java/com/google/devtools/build/skyframe/EventFilter.java
new file mode 100644
index 0000000000..113e9f3548
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/skyframe/EventFilter.java
@@ -0,0 +1,26 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.skyframe;
+
+import com.google.common.base.Predicate;
+import com.google.devtools.build.lib.events.Event;
+
+/** Filters out events which should not be stored during evaluation in {@link ParallelEvaluator}. */
+public interface EventFilter extends Predicate<Event> {
+ /**
+ * Returns true if any events should be stored. Otherwise, optimizations may be made to avoid
+ * doing unnecessary work when evaluating node entries.
+ */
+ boolean storeEvents();
+}
diff --git a/src/main/java/com/google/devtools/build/skyframe/InMemoryMemoizingEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/InMemoryMemoizingEvaluator.java
index 972d020e3b..254548711c 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InMemoryMemoizingEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InMemoryMemoizingEvaluator.java
@@ -27,7 +27,6 @@ import com.google.devtools.build.skyframe.Differencer.Diff;
import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DeletingInvalidationState;
import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DirtyingInvalidationState;
import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.InvalidationState;
-import com.google.devtools.build.skyframe.ParallelEvaluator.EventFilter;
import com.google.devtools.build.skyframe.ParallelEvaluator.Receiver;
import com.google.devtools.build.skyframe.QueryableGraph.Reason;
import java.io.PrintStream;
diff --git a/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
new file mode 100644
index 0000000000..6d0dbfa6ff
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
@@ -0,0 +1,154 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.skyframe;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Sets;
+import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
+import com.google.devtools.build.lib.concurrent.ErrorClassifier;
+import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor;
+import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Threadpool manager for {@link ParallelEvaluator}. Wraps a {@link QuiescingExecutor} and keeps
+ * track of pending nodes.
+ */
+class NodeEntryVisitor {
+ private static final ErrorClassifier NODE_ENTRY_VISITOR_ERROR_CLASSIFIER =
+ new ErrorClassifier() {
+ @Override
+ protected ErrorClassification classifyException(Exception e) {
+ if (e instanceof SchedulerException) {
+ return ErrorClassification.CRITICAL;
+ }
+ if (e instanceof RuntimeException) {
+ // We treat non-SchedulerException RuntimeExceptions as more severe than
+ // SchedulerExceptions so that AbstractQueueVisitor will propagate instances of the
+ // former. They indicate actual Blaze bugs, rather than normal Skyframe evaluation
+ // control flow.
+ return ErrorClassification.CRITICAL_AND_LOG;
+ }
+ return ErrorClassification.NOT_CRITICAL;
+ }
+ };
+
+ private final QuiescingExecutor quiescingExecutor;
+ private final AtomicBoolean preventNewEvaluations = new AtomicBoolean(false);
+ private final Set<SkyKey> inflightNodes = Sets.newConcurrentHashSet();
+ private final Set<RuntimeException> crashes = Sets.newConcurrentHashSet();
+ private final DirtyKeyTracker dirtyKeyTracker;
+ private final EvaluationProgressReceiver progressReceiver;
+ /**
+ * Function that allows this visitor to execute the appropriate {@link Runnable} when given a
+ * {@link SkyKey} to evaluate.
+ */
+ private final Function<SkyKey, Runnable> runnableMaker;
+
+ NodeEntryVisitor(
+ ForkJoinPool forkJoinPool,
+ DirtyKeyTracker dirtyKeyTracker,
+ EvaluationProgressReceiver progressReceiver,
+ Function<SkyKey, Runnable> runnableMaker) {
+ quiescingExecutor =
+ new ForkJoinQuiescingExecutor(forkJoinPool, NODE_ENTRY_VISITOR_ERROR_CLASSIFIER);
+ this.dirtyKeyTracker = dirtyKeyTracker;
+ this.progressReceiver = progressReceiver;
+ this.runnableMaker = runnableMaker;
+ }
+
+ NodeEntryVisitor(
+ int threadCount,
+ DirtyKeyTracker dirtyKeyTracker,
+ EvaluationProgressReceiver progressReceiver,
+ Function<SkyKey, Runnable> runnableMaker) {
+ quiescingExecutor =
+ new AbstractQueueVisitor(
+ /*concurrent*/ true,
+ threadCount,
+ /*keepAliveTime=*/ 1,
+ TimeUnit.SECONDS,
+ /*failFastOnException*/ true,
+ "skyframe-evaluator",
+ NODE_ENTRY_VISITOR_ERROR_CLASSIFIER);
+ this.dirtyKeyTracker = dirtyKeyTracker;
+ this.progressReceiver = progressReceiver;
+ this.runnableMaker = runnableMaker;
+ }
+
+ void waitForCompletion() throws InterruptedException {
+ quiescingExecutor.awaitQuiescence(/*interruptWorkers=*/ true);
+ }
+
+ void enqueueEvaluation(SkyKey key) {
+ // We unconditionally add the key to the set of in-flight nodes because even if evaluation is
+ // never scheduled we still want to remove the previously created NodeEntry from the graph.
+ // Otherwise we would leave the graph in a weird state (wasteful garbage in the best case and
+ // inconsistent in the worst case).
+ boolean newlyEnqueued = inflightNodes.add(key);
+ // All nodes enqueued for evaluation will be either verified clean, re-evaluated, or cleaned
+ // up after being in-flight when an error happens in nokeep_going mode or in the event of an
+ // interrupt. In any of these cases, they won't be dirty anymore.
+ if (newlyEnqueued) {
+ dirtyKeyTracker.notDirty(key);
+ }
+ if (preventNewEvaluations.get()) {
+ return;
+ }
+ if (newlyEnqueued && progressReceiver != null) {
+ progressReceiver.enqueueing(key);
+ }
+ quiescingExecutor.execute(runnableMaker.apply(key));
+ }
+
+ /**
+ * Stop any new evaluations from being enqueued. Returns whether this was the first thread to
+ * request a halt. If true, this thread should proceed to throw an exception. If false, another
+ * thread already requested a halt and will throw an exception, and so this thread can simply end.
+ */
+ boolean preventNewEvaluations() {
+ return preventNewEvaluations.compareAndSet(false, true);
+ }
+
+ void noteCrash(RuntimeException e) {
+ crashes.add(e);
+ }
+
+ Collection<RuntimeException> getCrashes() {
+ return crashes;
+ }
+
+ void notifyDone(SkyKey key) {
+ inflightNodes.remove(key);
+ }
+
+ boolean isInflight(SkyKey key) {
+ return inflightNodes.contains(key);
+ }
+
+ Set<SkyKey> getInflightNodes() {
+ return inflightNodes;
+ }
+
+ @VisibleForTesting
+ CountDownLatch getExceptionLatchForTestingOnly() {
+ return quiescingExecutor.getExceptionLatchForTestingOnly();
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
index 0698acb8ca..a1c922cdfe 100644
--- a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
@@ -13,11 +13,8 @@
// limitations under the License.
package com.google.devtools.build.skyframe;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
@@ -26,17 +23,9 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.devtools.build.lib.collect.nestedset.NestedSet;
-import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder;
-import com.google.devtools.build.lib.collect.nestedset.NestedSetVisitor;
-import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
-import com.google.devtools.build.lib.concurrent.ErrorClassifier;
-import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor;
-import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
-import com.google.devtools.build.lib.events.StoredEventHandler;
import com.google.devtools.build.lib.profiler.AutoProfiler;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.ProfilerTask;
@@ -53,20 +42,15 @@ import com.google.devtools.build.skyframe.SkyFunctionException.ReifiedSkyFunctio
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -105,44 +89,7 @@ public final class ParallelEvaluator implements Evaluator {
private static final Logger LOG = Logger.getLogger(ParallelEvaluator.class.getName());
- private static final boolean PREFETCH_OLD_DEPS =
- Boolean.parseBoolean(
- System.getProperty("skyframe.ParallelEvaluator.PrefetchOldDeps", "true"));
-
- /** Filters out events which should not be stored. */
- public interface EventFilter extends Predicate<Event> {
- /**
- * Returns true if any events should be stored. Otherwise, optimizations may be made to avoid
- * doing unnecessary work.
- */
- boolean storeEvents();
- }
-
private final ProcessableGraph graph;
- private final Version graphVersion;
-
- private static class SkyValueSupplier implements Supplier<SkyValue> {
-
- private final NodeEntry state;
-
- public SkyValueSupplier(NodeEntry state) {
- this.state = state;
- }
-
- @Override
- public SkyValue get() {
- try {
- return state.getValue();
- } catch (InterruptedException e) {
- throw new IllegalStateException(
- "Graph implementations in which value retrieval can block should not be used in "
- + "frameworks that use the value in EvaluationProgressReceiver, since that could "
- + "result in significant slowdowns: "
- + state,
- e);
- }
- }
- }
/** An general interface for {@link ParallelEvaluator} to receive objects of type {@code T}. */
public interface Receiver<T> {
@@ -153,18 +100,10 @@ public final class ParallelEvaluator implements Evaluator {
void accept(T object);
}
- private final ImmutableMap<SkyFunctionName, ? extends SkyFunction> skyFunctions;
-
- private final EventHandler reporter;
- private final NestedSetVisitor<TaggedEvents> replayingNestedSetEventVisitor;
- private final boolean keepGoing;
- private final boolean storeErrorsAlongsideValues;
- private final int threadCount;
- @Nullable private final ForkJoinPool forkJoinPool;
- @Nullable private final EvaluationProgressReceiver progressReceiver;
private final DirtyKeyTracker dirtyKeyTracker;
private final Receiver<Collection<SkyKey>> inflightKeysReceiver;
- private final EventFilter storedEventFilter;
+
+ private final ParallelEvaluatorContext evaluatorContext;
public ParallelEvaluator(
ProcessableGraph graph,
@@ -179,19 +118,22 @@ public final class ParallelEvaluator implements Evaluator {
DirtyKeyTracker dirtyKeyTracker,
Receiver<Collection<SkyKey>> inflightKeysReceiver) {
this.graph = graph;
- this.skyFunctions = skyFunctions;
- this.graphVersion = graphVersion;
this.inflightKeysReceiver = inflightKeysReceiver;
- this.reporter = Preconditions.checkNotNull(reporter);
- this.keepGoing = keepGoing;
- this.storeErrorsAlongsideValues = true;
- this.threadCount = threadCount;
- this.progressReceiver = progressReceiver;
this.dirtyKeyTracker = Preconditions.checkNotNull(dirtyKeyTracker);
- this.replayingNestedSetEventVisitor =
- new NestedSetVisitor<>(new NestedSetEventReceiver(reporter), emittedEventState);
- this.storedEventFilter = storedEventFilter;
- this.forkJoinPool = null;
+ evaluatorContext =
+ new ParallelEvaluatorContext(
+ graph,
+ graphVersion,
+ skyFunctions,
+ reporter,
+ emittedEventState,
+ keepGoing,
+ /*storeErrorsAlongsideValues=*/ true,
+ progressReceiver,
+ storedEventFilter,
+ dirtyKeyTracker,
+ createEvaluateRunnable(),
+ threadCount);
}
public ParallelEvaluator(
@@ -208,671 +150,37 @@ public final class ParallelEvaluator implements Evaluator {
Receiver<Collection<SkyKey>> inflightKeysReceiver,
ForkJoinPool forkJoinPool) {
this.graph = graph;
- this.skyFunctions = skyFunctions;
- this.graphVersion = graphVersion;
this.inflightKeysReceiver = inflightKeysReceiver;
- this.reporter = Preconditions.checkNotNull(reporter);
- this.keepGoing = keepGoing;
- this.storeErrorsAlongsideValues = storeErrorsAlongsideValues;
Preconditions.checkState(storeErrorsAlongsideValues || keepGoing);
- this.threadCount = 0;
- this.progressReceiver = progressReceiver;
this.dirtyKeyTracker = Preconditions.checkNotNull(dirtyKeyTracker);
- this.replayingNestedSetEventVisitor =
- new NestedSetVisitor<>(new NestedSetEventReceiver(reporter), emittedEventState);
- this.storedEventFilter = storedEventFilter;
- this.forkJoinPool = Preconditions.checkNotNull(forkJoinPool);
- }
-
- private Map<SkyKey, ? extends NodeEntry> getBatchValues(
- SkyKey parent, Reason reason, Iterable<SkyKey> keys) throws InterruptedException {
- return graph.getBatch(parent, reason, keys);
- }
-
- /** Receives the events from the NestedSet and delegates to the reporter. */
- private static class NestedSetEventReceiver implements NestedSetVisitor.Receiver<TaggedEvents> {
-
- private final EventHandler reporter;
-
- public NestedSetEventReceiver(EventHandler reporter) {
- this.reporter = reporter;
- }
- @Override
- public void accept(TaggedEvents events) {
- String tag = events.getTag();
- for (Event e : events.getEvents()) {
- reporter.handle(e.withTag(tag));
- }
- }
+ evaluatorContext =
+ new ParallelEvaluatorContext(
+ graph,
+ graphVersion,
+ skyFunctions,
+ reporter,
+ emittedEventState,
+ keepGoing,
+ storeErrorsAlongsideValues,
+ progressReceiver,
+ storedEventFilter,
+ dirtyKeyTracker,
+ createEvaluateRunnable(),
+ Preconditions.checkNotNull(forkJoinPool));
}
/**
- * A suitable {@link SkyFunction.Environment} implementation.
+ * Creates a {@link Runnable} that is injected into the {@link NodeEntryVisitor} created on demand
+ * in {@link #evaluatorContext}, so that the visitor can enqueue the appropriate {@link Runnable}
+ * when it is given a {@link SkyKey} to evaluate.
*/
- class SkyFunctionEnvironment extends AbstractSkyFunctionEnvironment {
- private boolean building = true;
- private SkyKey depErrorKey = null;
- private final SkyKey skyKey;
- /**
- * The deps requested during the previous build of this node. Used for two reasons: (1) They are
- * fetched eagerly before the node is built, to potentially prime the graph and speed up
- * requests for them during evaluation. (2) When the node finishes building, any deps from the
- * previous build that are not deps from this build must have this node removed from them as a
- * reverse dep. Thus, it is important that all nodes in this set have the property that they
- * have this node as a reverse dep from the last build, but that this node has not added them as
- * a reverse dep on this build. That set is normally {@link
- * NodeEntry#getAllRemainingDirtyDirectDeps()}, but in certain corner cases, like cycles,
- * further filtering may be needed.
- */
- private final Set<SkyKey> oldDeps;
- private SkyValue value = null;
- private ErrorInfo errorInfo = null;
- private final Map<SkyKey, ValueWithMetadata> bubbleErrorInfo;
- /** The values previously declared as dependencies. */
- private final Map<SkyKey, NodeEntry> directDeps;
-
- /**
- * The grouped list of values requested during this build as dependencies. On a subsequent
- * build, if this value is dirty, all deps in the same dependency group can be checked in
- * parallel for changes. In other words, if dep1 and dep2 are in the same group, then dep1 will
- * be checked in parallel with dep2. See {@link #getValues} for more.
- */
- private final GroupedListHelper<SkyKey> newlyRequestedDeps = new GroupedListHelper<>();
-
- /**
- * The value visitor managing the thread pool. Used to enqueue parents when this value is
- * finished, and, during testing, to block until an exception is thrown if a value builder
- * requests that.
- */
- private final ValueVisitor visitor;
-
- /** The set of errors encountered while fetching children. */
- private final Collection<ErrorInfo> childErrorInfos = new LinkedHashSet<>();
- private final StoredEventHandler eventHandler =
- new StoredEventHandler() {
- @Override
- @SuppressWarnings("UnsynchronizedOverridesSynchronized") // only delegates to thread-safe.
- public void handle(Event e) {
- checkActive();
- if (storedEventFilter.apply(e)) {
- super.handle(e);
- } else {
- reporter.handle(e);
- }
- }
- };
-
- private SkyFunctionEnvironment(
- SkyKey skyKey, GroupedList<SkyKey> directDeps, Set<SkyKey> oldDeps, ValueVisitor visitor)
- throws InterruptedException {
- this(skyKey, directDeps, null, oldDeps, visitor);
- }
-
- private SkyFunctionEnvironment(
- SkyKey skyKey,
- GroupedList<SkyKey> directDeps,
- @Nullable Map<SkyKey, ValueWithMetadata> bubbleErrorInfo,
- Set<SkyKey> oldDeps,
- ValueVisitor visitor)
- throws InterruptedException {
- this.skyKey = skyKey;
- this.oldDeps = oldDeps;
- this.directDeps = Collections.unmodifiableMap(batchPrefetch(
- skyKey, directDeps, oldDeps, /*assertDone=*/bubbleErrorInfo == null, skyKey));
- this.bubbleErrorInfo = bubbleErrorInfo;
- this.visitor = visitor;
- Preconditions.checkState(
- !this.directDeps.containsKey(ErrorTransienceValue.KEY),
- "%s cannot have a dep on ErrorTransienceValue during building",
- skyKey);
- }
-
- private Map<SkyKey, ? extends NodeEntry> batchPrefetch(
- SkyKey requestor,
- GroupedList<SkyKey> depKeys,
- Set<SkyKey> oldDeps,
- boolean assertDone,
- SkyKey keyForDebugging)
- throws InterruptedException {
- Iterable<SkyKey> depKeysAsIterable = Iterables.concat(depKeys);
- Iterable<SkyKey> keysToPrefetch = depKeysAsIterable;
- if (PREFETCH_OLD_DEPS) {
- ImmutableSet.Builder<SkyKey> keysToPrefetchBuilder = ImmutableSet.builder();
- keysToPrefetchBuilder.addAll(depKeysAsIterable).addAll(oldDeps);
- keysToPrefetch = keysToPrefetchBuilder.build();
- }
- Map<SkyKey, ? extends NodeEntry> batchMap =
- getBatchValues(requestor, Reason.PREFETCH, keysToPrefetch);
- if (PREFETCH_OLD_DEPS) {
- batchMap =
- ImmutableMap.<SkyKey, NodeEntry>copyOf(
- Maps.filterKeys(batchMap, Predicates.in(ImmutableSet.copyOf(depKeysAsIterable))));
- }
- if (batchMap.size() != depKeys.numElements()) {
- throw new IllegalStateException(
- "Missing keys for "
- + keyForDebugging
- + ": "
- + Sets.difference(depKeys.toSet(), batchMap.keySet()));
- }
- if (assertDone) {
- for (Map.Entry<SkyKey, ? extends NodeEntry> entry : batchMap.entrySet()) {
- Preconditions.checkState(
- entry.getValue().isDone(), "%s had not done %s", keyForDebugging, entry);
- }
- }
- return batchMap;
- }
-
- private void checkActive() {
- Preconditions.checkState(building, skyKey);
- }
-
- private NestedSet<TaggedEvents> buildEvents(NodeEntry entry, boolean missingChildren)
- throws InterruptedException {
- // Aggregate the nested set of events from the direct deps, also adding the events from
- // building this value.
- NestedSetBuilder<TaggedEvents> eventBuilder = NestedSetBuilder.stableOrder();
- ImmutableList<Event> events = eventHandler.getEvents();
- if (!events.isEmpty()) {
- eventBuilder.add(new TaggedEvents(getTagFromKey(), events));
- }
- if (storedEventFilter.storeEvents()) {
- // Only do the work of processing children if we're going to store events.
- GroupedList<SkyKey> depKeys = entry.getTemporaryDirectDeps();
- Collection<SkyValue> deps = getDepValuesForDoneNodeMaybeFromError(depKeys);
- if (!missingChildren && depKeys.numElements() != deps.size()) {
- throw new IllegalStateException(
- "Missing keys for "
- + skyKey
- + ". Present values: "
- + deps
- + "requested from: "
- + depKeys
- + ", "
- + entry);
- }
- for (SkyValue value : deps) {
- eventBuilder.addTransitive(ValueWithMetadata.getEvents(value));
- }
- }
- return eventBuilder.build();
- }
-
- /**
- * If this node has an error, that is, if errorInfo is non-null, do nothing. Otherwise, set
- * errorInfo to the union of the child errors that were recorded earlier by getValueOrException,
- * if there are any.
- *
- * <p>Child errors are remembered, if there are any and yet the parent recovered without
- * error, so that subsequent noKeepGoing evaluations can stop as soon as they encounter a
- * node whose (transitive) children had experienced an error, even if that (transitive)
- * parent node had been able to recover from it during a keepGoing build. This behavior can be
- * suppressed by setting {@link #storeErrorsAlongsideValues} to false, which will cause nodes
- * with values to have no stored error info. This may be useful if this graph will only ever be
- * used for keepGoing builds, since in that case storing errors from recovered nodes is
- * pointless.
- */
- private void finalizeErrorInfo() {
- if (errorInfo == null
- && (storeErrorsAlongsideValues || value == null)
- && !childErrorInfos.isEmpty()) {
- errorInfo = ErrorInfo.fromChildErrors(skyKey, childErrorInfos);
- }
- }
-
- private void setValue(SkyValue newValue) {
- Preconditions.checkState(errorInfo == null && bubbleErrorInfo == null,
- "%s %s %s %s", skyKey, newValue, errorInfo, bubbleErrorInfo);
- Preconditions.checkState(value == null, "%s %s %s", skyKey, value, newValue);
- value = newValue;
- }
-
- /**
- * Set this node to be in error. The node's value must not have already been set. However, all
- * dependencies of this node <i>must</i> already have been registered, since this method may
- * register a dependence on the error transience node, which should always be the last dep.
- */
- private void setError(NodeEntry state, ErrorInfo errorInfo, boolean isDirectlyTransient)
- throws InterruptedException {
- Preconditions.checkState(value == null, "%s %s %s", skyKey, value, errorInfo);
- Preconditions.checkState(this.errorInfo == null,
- "%s %s %s", skyKey, this.errorInfo, errorInfo);
-
- if (isDirectlyTransient) {
- NodeEntry errorTransienceNode =
- graph.get(skyKey, Reason.RDEP_ADDITION, ErrorTransienceValue.KEY);
- DependencyState triState;
- if (oldDeps.contains(ErrorTransienceValue.KEY)) {
- triState = errorTransienceNode.checkIfDoneForDirtyReverseDep(skyKey);
- } else {
- triState = errorTransienceNode.addReverseDepAndCheckIfDone(skyKey);
- }
- Preconditions.checkState(triState == DependencyState.DONE,
- "%s %s %s", skyKey, triState, errorInfo);
- state.addTemporaryDirectDeps(
- GroupedListHelper.create(ImmutableList.of(ErrorTransienceValue.KEY)));
- state.signalDep();
- }
-
- this.errorInfo = Preconditions.checkNotNull(errorInfo, skyKey);
- }
-
- /**
- * Returns a map from key to value for the requested {@code keys}, looking at {@link
- * #bubbleErrorInfo}, {@link #directDeps}, and the backing {@link #graph} in that order. Any
- * keys that are not yet done will be present in the map with the value {@link #NULL_MARKER}.
- * {@link ErrorTransienceValue#KEY} must not be present in {@code keys}.
- */
- private Map<SkyKey, SkyValue> getValuesMaybeFromError(Iterable<SkyKey> keys)
- throws InterruptedException {
- // Use a HashMap, not an ImmutableMap.Builder, because we have not yet deduplicated these keys
- // and ImmutableMap.Builder does not tolerate duplicates. The map will be thrown away
- // shortly in any case.
- Map<SkyKey, SkyValue> result = new HashMap<>();
- ArrayList<SkyKey> missingKeys = new ArrayList<>();
- for (SkyKey key : keys) {
- Preconditions.checkState(
- !key.equals(ErrorTransienceValue.KEY),
- "Error transience key cannot be in requested deps of %s",
- skyKey);
- SkyValue value = maybeGetValueFromErrorOrDeps(key);
- if (value == null) {
- missingKeys.add(key);
- } else {
- result.put(key, value);
- }
- }
- Map<SkyKey, ? extends NodeEntry> missingEntries =
- getBatchValues(skyKey, Reason.DEP_REQUESTED, missingKeys);
- for (SkyKey key : missingKeys) {
- result.put(key, getValueOrNullMarker(missingEntries.get(key)));
- }
- return result;
- }
-
- /**
- * Returns just the values of the deps in {@code depKeys}, looking at {@code bubbleErrorInfo},
- * {@link #directDeps}, and the backing {@link #graph} in that order. Any deps that are not yet
- * done will not have their values present in the returned collection.
- */
- private Collection<SkyValue> getDepValuesForDoneNodeMaybeFromError(GroupedList<SkyKey> depKeys)
- throws InterruptedException {
- int keySize = depKeys.numElements();
- List<SkyValue> result = new ArrayList<>(keySize);
- // depKeys consists of all known deps of this entry. That should include all the keys in
- // directDeps, and any keys in bubbleErrorInfo. We expect to have to retrieve the keys that
- // are not in either one.
- int expectedMissingKeySize =
- Math.max(
- keySize - directDeps.size() - (bubbleErrorInfo == null ? 0 : bubbleErrorInfo.size()),
- 0);
- ArrayList<SkyKey> missingKeys = new ArrayList<>(expectedMissingKeySize);
- for (SkyKey key : Iterables.concat(depKeys)) {
- SkyValue value = maybeGetValueFromErrorOrDeps(key);
- if (value == null) {
- missingKeys.add(key);
- } else {
- result.add(value);
- }
- }
- for (NodeEntry entry : getBatchValues(skyKey, Reason.DEP_REQUESTED, missingKeys).values()) {
- result.add(getValueOrNullMarker(entry));
- }
- return result;
- }
-
- @Nullable
- private SkyValue maybeGetValueFromErrorOrDeps(SkyKey key) throws InterruptedException {
- return maybeGetValueFromError(key, directDeps.get(key), bubbleErrorInfo);
- }
-
- @Override
- protected Map<SkyKey, ValueOrUntypedException> getValueOrUntypedExceptions(
- Iterable<SkyKey> depKeys) throws InterruptedException {
- checkActive();
- Map<SkyKey, SkyValue> values = getValuesMaybeFromError(depKeys);
- for (Map.Entry<SkyKey, SkyValue> depEntry : values.entrySet()) {
- SkyKey depKey = depEntry.getKey();
- SkyValue depValue = depEntry.getValue();
- if (depValue == NULL_MARKER) {
- if (directDeps.containsKey(depKey)) {
- throw new IllegalStateException(
- "Undone key "
- + depKey
- + " was already in deps of "
- + skyKey
- + "( dep: "
- + graph.get(skyKey, Reason.OTHER, depKey)
- + ", parent: "
- + graph.get(null, Reason.OTHER, skyKey));
- }
- valuesMissing = true;
- addDep(depKey);
- continue;
- }
- ErrorInfo errorInfo = ValueWithMetadata.getMaybeErrorInfo(depEntry.getValue());
- if (errorInfo != null) {
- childErrorInfos.add(errorInfo);
- if (bubbleErrorInfo != null) {
- // Set interrupted status, to try to prevent the calling SkyFunction from doing anything
- // fancy after this. SkyFunctions executed during error bubbling are supposed to
- // (quickly) rethrow errors or return a value/null (but there's currently no way to
- // enforce this).
- Thread.currentThread().interrupt();
- }
- if ((!keepGoing && bubbleErrorInfo == null) || errorInfo.getException() == null) {
- valuesMissing = true;
- // We arbitrarily record the first child error if we are about to abort.
- if (!keepGoing && depErrorKey == null) {
- depErrorKey = depKey;
- }
- }
- }
-
- if (!directDeps.containsKey(depKey)) {
- if (bubbleErrorInfo == null) {
- addDep(depKey);
- }
- replayingNestedSetEventVisitor.visit(ValueWithMetadata.getEvents(depValue));
- }
- }
-
- return Maps.transformValues(
- values,
- new Function<SkyValue, ValueOrUntypedException>() {
- @Override
- public ValueOrUntypedException apply(SkyValue maybeWrappedValue) {
- if (maybeWrappedValue == NULL_MARKER) {
- return ValueOrExceptionUtils.ofNull();
- }
- SkyValue justValue = ValueWithMetadata.justValue(maybeWrappedValue);
- ErrorInfo errorInfo = ValueWithMetadata.getMaybeErrorInfo(maybeWrappedValue);
-
- if (justValue != null && (keepGoing || errorInfo == null)) {
- // If the dep did compute a value, it is given to the caller if we are in
- // keepGoing mode or if we are in noKeepGoingMode and there were no errors computing
- // it.
- return ValueOrExceptionUtils.ofValueUntyped(justValue);
- }
-
- // There was an error building the value, which we will either report by throwing an
- // exception or insulate the caller from by returning null.
- Preconditions.checkNotNull(errorInfo, "%s %s", skyKey, maybeWrappedValue);
- Exception exception = errorInfo.getException();
-
- if (!keepGoing && exception != null && bubbleErrorInfo == null) {
- // Child errors should not be propagated in noKeepGoing mode (except during error
- // bubbling). Instead we should fail fast.
- return ValueOrExceptionUtils.ofNull();
- }
-
- if (exception != null) {
- // Give builder a chance to handle this exception.
- return ValueOrExceptionUtils.ofExn(exception);
- }
- // In a cycle.
- Preconditions.checkState(
- !Iterables.isEmpty(errorInfo.getCycleInfo()),
- "%s %s %s",
- skyKey,
- errorInfo,
- maybeWrappedValue);
- return ValueOrExceptionUtils.ofNull();
- }
- });
- }
-
- @Override
- public <
- E1 extends Exception,
- E2 extends Exception,
- E3 extends Exception,
- E4 extends Exception,
- E5 extends Exception>
- Map<SkyKey, ValueOrException5<E1, E2, E3, E4, E5>> getValuesOrThrow(
- Iterable<SkyKey> depKeys,
- Class<E1> exceptionClass1,
- Class<E2> exceptionClass2,
- Class<E3> exceptionClass3,
- Class<E4> exceptionClass4,
- Class<E5> exceptionClass5)
- throws InterruptedException {
- newlyRequestedDeps.startGroup();
- Map<SkyKey, ValueOrException5<E1, E2, E3, E4, E5>> result = super.getValuesOrThrow(
- depKeys,
- exceptionClass1,
- exceptionClass2,
- exceptionClass3,
- exceptionClass4,
- exceptionClass5);
- newlyRequestedDeps.endGroup();
- return result;
- }
-
- private void addDep(SkyKey key) {
- if (!newlyRequestedDeps.contains(key)) {
- // dep may have been requested already this evaluation. If not, add it.
- newlyRequestedDeps.add(key);
- }
- }
-
- /**
- * If {@code !keepGoing} and there is at least one dep in error, returns a dep in error.
- * Otherwise returns {@code null}.
- */
- @Nullable
- private SkyKey getDepErrorKey() {
- return depErrorKey;
- }
-
- @Override
- public EventHandler getListener() {
- checkActive();
- return eventHandler;
- }
-
- private void doneBuilding() {
- building = false;
- }
-
- /**
- * Apply the change to the graph (mostly) atomically and signal all nodes that are waiting for
- * this node to complete. Adding nodes and signaling is not atomic, but may need to be changed
- * for interruptibility.
- *
- * <p>Parents are only enqueued if {@code enqueueParents} holds. Parents should be enqueued
- * unless (1) this node is being built after the main evaluation has aborted, or (2) this node
- * is being built with --nokeep_going, and so we are about to shut down the main evaluation
- * anyway.
- *
- * <p>The node entry is informed if the node's value and error are definitive via the flag
- * {@code completeValue}.
- */
- void commit(NodeEntry primaryEntry, boolean enqueueParents) throws InterruptedException {
- // Construct the definitive error info, if there is one.
- finalizeErrorInfo();
-
- // We have the following implications:
- // errorInfo == null => value != null => enqueueParents.
- // All these implications are strict:
- // (1) errorInfo != null && value != null happens for values with recoverable errors.
- // (2) value == null && enqueueParents happens for values that are found to have errors
- // during a --keep_going build.
-
- NestedSet<TaggedEvents> events = buildEvents(primaryEntry, /*missingChildren=*/false);
- Version valueVersion;
- SkyValue valueWithMetadata;
- if (value == null) {
- Preconditions.checkNotNull(errorInfo, "%s %s", skyKey, primaryEntry);
- valueWithMetadata = ValueWithMetadata.error(errorInfo, events);
- } else {
- // We must be enqueueing parents if we have a value.
- Preconditions.checkState(enqueueParents, "%s %s", skyKey, primaryEntry);
- valueWithMetadata = ValueWithMetadata.normal(value, errorInfo, events);
- }
- if (!oldDeps.isEmpty()) {
- // Remove the rdep on this entry for each of its old deps that is no longer a direct dep.
- Set<SkyKey> depsToRemove =
- Sets.difference(oldDeps, primaryEntry.getTemporaryDirectDeps().toSet());
- Collection<? extends NodeEntry> oldDepEntries =
- graph.getBatch(skyKey, Reason.RDEP_REMOVAL, depsToRemove).values();
- for (NodeEntry oldDepEntry : oldDepEntries) {
- oldDepEntry.removeReverseDep(skyKey);
- }
+ private Function<SkyKey, Runnable> createEvaluateRunnable() {
+ return new Function<SkyKey, Runnable>() {
+ @Override
+ public Runnable apply(SkyKey skyKey) {
+ return new Evaluate(skyKey);
}
- // If this entry is dirty, setValue may not actually change it, if it determines that
- // the data being written now is the same as the data already present in the entry.
- // We could consider using max(childVersions) here instead of graphVersion. When full
- // versioning is implemented, this would allow evaluation at a version between
- // max(childVersions) and graphVersion to re-use this result.
- Set<SkyKey> reverseDeps = primaryEntry.setValue(valueWithMetadata, graphVersion);
- // Note that if this update didn't actually change the value entry, this version may not
- // be the graph version.
- valueVersion = primaryEntry.getVersion();
- Preconditions.checkState(valueVersion.atMost(graphVersion),
- "%s should be at most %s in the version partial ordering",
- valueVersion, graphVersion);
- if (progressReceiver != null) {
- // Tell the receiver that this value was built. If valueVersion.equals(graphVersion), it
- // was evaluated this run, and so was changed. Otherwise, it is less than graphVersion,
- // by the Preconditions check above, and was not actually changed this run -- when it was
- // written above, its version stayed below this update's version, so its value remains the
- // same as before.
- // We use a SkyValueSupplier here because it keeps a reference to the entry, allowing for
- // the receiver to be confident that the entry is readily accessible in memory.
- progressReceiver.evaluated(skyKey, new SkyValueSupplier(primaryEntry),
- valueVersion.equals(graphVersion) ? EvaluationState.BUILT : EvaluationState.CLEAN);
- }
- signalValuesAndEnqueueIfReady(
- enqueueParents ? visitor : null,
- skyKey,
- reverseDeps,
- valueVersion);
-
- visitor.notifyDone(skyKey);
- replayingNestedSetEventVisitor.visit(events);
- }
-
- @Nullable
- private String getTagFromKey() {
- return skyFunctions.get(skyKey.functionName()).extractTag(skyKey);
- }
-
- /**
- * Gets the latch that is counted down when an exception is thrown in {@code
- * AbstractQueueVisitor}. For use in tests to check if an exception actually was thrown. Calling
- * {@code AbstractQueueVisitor#awaitExceptionForTestingOnly} can throw a spurious {@link
- * InterruptedException} because {@link CountDownLatch#await} checks the interrupted bit before
- * returning, even if the latch is already at 0. See bug "testTwoErrors is flaky".
- */
- CountDownLatch getExceptionLatchForTesting() {
- return visitor.getExceptionLatchForTestingOnly();
- }
-
- @Override
- public boolean inErrorBubblingForTesting() {
- return bubbleErrorInfo != null;
- }
- }
-
- private static final ErrorClassifier VALUE_VISITOR_ERROR_CLASSIFIER =
- new ErrorClassifier() {
- @Override
- protected ErrorClassification classifyException(Exception e) {
- if (e instanceof SchedulerException) {
- return ErrorClassification.CRITICAL;
- }
- if (e instanceof RuntimeException) {
- // We treat non-SchedulerException RuntimeExceptions as more severe than
- // SchedulerExceptions so that AbstractQueueVisitor will propagate instances of the
- // former. They indicate actual Blaze bugs, rather than normal Skyframe evaluation
- // control flow.
- return ErrorClassification.CRITICAL_AND_LOG;
- }
- return ErrorClassification.NOT_CRITICAL;
- }
- };
-
- private class ValueVisitor {
-
- private final QuiescingExecutor quiescingExecutor;
- private final AtomicBoolean preventNewEvaluations = new AtomicBoolean(false);
- private final Set<SkyKey> inflightNodes = Sets.newConcurrentHashSet();
- private final Set<RuntimeException> crashes = Sets.newConcurrentHashSet();
-
- private ValueVisitor(ForkJoinPool forkJoinPool) {
- quiescingExecutor =
- new ForkJoinQuiescingExecutor(forkJoinPool, VALUE_VISITOR_ERROR_CLASSIFIER);
- }
-
- private ValueVisitor(int threadCount) {
- quiescingExecutor =
- new AbstractQueueVisitor(
- /*concurrent*/ true,
- threadCount,
- /*keepAliveTime=*/ 1,
- TimeUnit.SECONDS,
- /*failFastOnException*/ true,
- "skyframe-evaluator",
- VALUE_VISITOR_ERROR_CLASSIFIER);
- }
-
- private void waitForCompletion() throws InterruptedException {
- quiescingExecutor.awaitQuiescence(/*interruptWorkers=*/ true);
- }
-
- private void enqueueEvaluation(SkyKey key) {
- // We unconditionally add the key to the set of in-flight nodes because even if evaluation is
- // never scheduled we still want to remove the previously created NodeEntry from the graph.
- // Otherwise we would leave the graph in a weird state (wasteful garbage in the best case and
- // inconsistent in the worst case).
- boolean newlyEnqueued = inflightNodes.add(key);
- // All nodes enqueued for evaluation will be either verified clean, re-evaluated, or cleaned
- // up after being in-flight when an error happens in nokeep_going mode or in the event of an
- // interrupt. In any of these cases, they won't be dirty anymore.
- if (newlyEnqueued) {
- dirtyKeyTracker.notDirty(key);
- }
- if (preventNewEvaluations.get()) {
- return;
- }
- if (newlyEnqueued && progressReceiver != null) {
- progressReceiver.enqueueing(key);
- }
- quiescingExecutor.execute(new Evaluate(this, key));
- }
-
- /**
- * Stop any new evaluations from being enqueued. Returns whether this was the first thread to
- * request a halt. If true, this thread should proceed to throw an exception. If false, another
- * thread already requested a halt and will throw an exception, and so this thread can simply
- * end.
- */
- private boolean preventNewEvaluations() {
- return preventNewEvaluations.compareAndSet(false, true);
- }
-
- private void noteCrash(RuntimeException e) {
- crashes.add(e);
- }
-
- private Collection<RuntimeException> getCrashes() {
- return crashes;
- }
-
- private void notifyDone(SkyKey key) {
- inflightNodes.remove(key);
- }
-
- private boolean isInflight(SkyKey key) {
- return inflightNodes.contains(key);
- }
-
- @VisibleForTesting
- private CountDownLatch getExceptionLatchForTestingOnly() {
- return quiescingExecutor.getExceptionLatchForTestingOnly();
- }
+ };
}
/**
@@ -893,12 +201,10 @@ public final class ParallelEvaluator implements Evaluator {
* An action that evaluates a value.
*/
private class Evaluate implements Runnable {
- private final ValueVisitor visitor;
/** The name of the value to be evaluated. */
private final SkyKey skyKey;
- private Evaluate(ValueVisitor visitor, SkyKey skyKey) {
- this.visitor = visitor;
+ private Evaluate(SkyKey skyKey) {
this.skyKey = skyKey;
}
@@ -917,13 +223,13 @@ public final class ParallelEvaluator implements Evaluator {
case DONE:
if (entry.signalDep(childEntry.getVersion())) {
// This can only happen if there are no more children to be added.
- visitor.enqueueEvaluation(skyKey);
+ evaluatorContext.getVisitor().enqueueEvaluation(skyKey);
}
break;
case ALREADY_EVALUATING:
break;
case NEEDS_SCHEDULING:
- visitor.enqueueEvaluation(child);
+ evaluatorContext.getVisitor().enqueueEvaluation(child);
break;
}
}
@@ -975,7 +281,7 @@ public final class ParallelEvaluator implements Evaluator {
skyKey, Reason.RDEP_REMOVAL, ErrorTransienceValue.KEY).removeReverseDep(skyKey);
return DirtyOutcome.NEEDS_EVALUATION;
}
- if (!keepGoing) {
+ if (!evaluatorContext.keepGoing()) {
// This check ensures that we maintain the invariant that if a node with an error is
// reached during a no-keep-going build, none of its currently building parents
// finishes building. If the child isn't done building yet, it will detect on its own
@@ -999,7 +305,7 @@ public final class ParallelEvaluator implements Evaluator {
depEntry.getValue().removeReverseDep(skyKey);
}
}
- if (!visitor.preventNewEvaluations()) {
+ if (!evaluatorContext.getVisitor().preventNewEvaluations()) {
// An error was already thrown in the evaluator. Don't do anything here.
return DirtyOutcome.ALREADY_PROCESSED;
}
@@ -1030,19 +336,22 @@ public final class ParallelEvaluator implements Evaluator {
case VERIFIED_CLEAN:
// No child has a changed value. This node can be marked done and its parents signaled
// without any re-evaluation.
- visitor.notifyDone(skyKey);
+ evaluatorContext.getVisitor().notifyDone(skyKey);
Set<SkyKey> reverseDeps = state.markClean();
- if (progressReceiver != null) {
+ if (evaluatorContext.getProgressReceiver() != null) {
// Tell the receiver that the value was not actually changed this run.
- progressReceiver.evaluated(skyKey, new SkyValueSupplier(state), EvaluationState.CLEAN);
+ evaluatorContext
+ .getProgressReceiver()
+ .evaluated(skyKey, new SkyValueSupplier(state), EvaluationState.CLEAN);
}
- if (!keepGoing && state.getErrorInfo() != null) {
- if (!visitor.preventNewEvaluations()) {
+ if (!evaluatorContext.keepGoing() && state.getErrorInfo() != null) {
+ if (!evaluatorContext.getVisitor().preventNewEvaluations()) {
return DirtyOutcome.ALREADY_PROCESSED;
}
throw SchedulerException.ofError(state.getErrorInfo(), skyKey);
}
- signalValuesAndEnqueueIfReady(visitor, skyKey, reverseDeps, state.getVersion());
+ evaluatorContext.signalValuesAndEnqueueIfReady(
+ skyKey, reverseDeps, state.getVersion(), /*enqueueParents=*/ true);
return DirtyOutcome.ALREADY_PROCESSED;
case NEEDS_REBUILDING:
maybeMarkRebuilding(state);
@@ -1066,12 +375,17 @@ public final class ParallelEvaluator implements Evaluator {
}
Set<SkyKey> oldDeps = state.getAllRemainingDirtyDirectDeps();
- SkyFunctionEnvironment env =
- new SkyFunctionEnvironment(skyKey, state.getTemporaryDirectDeps(), oldDeps, visitor);
+ SkyFunctionEnvironment env =
+ new SkyFunctionEnvironment(
+ skyKey, state.getTemporaryDirectDeps(), oldDeps, evaluatorContext);
SkyFunctionName functionName = skyKey.functionName();
- SkyFunction factory = skyFunctions.get(functionName);
- Preconditions.checkState(factory != null,
- "Unable to find SkyFunction '%s' for node with key %s, %s", functionName, skyKey, state);
+ SkyFunction factory =
+ Preconditions.checkNotNull(
+ evaluatorContext.getSkyFunctions().get(functionName),
+ "Unable to find SkyFunction '%s' for node with key %s, %s",
+ functionName,
+ skyKey,
+ state);
SkyValue value = null;
long startTime = BlazeClock.instance().nanoTime();
@@ -1085,25 +399,27 @@ public final class ParallelEvaluator implements Evaluator {
// graph structure, thus avoiding non-determinism. It's completely reasonable for
// SkyFunctions to throw eagerly because they do not know if they are in keep-going mode.
// Propagated transitive errors are treated the same as missing deps.
- if ((!keepGoing || !env.valuesMissing())
+ if ((!evaluatorContext.keepGoing() || !env.valuesMissing())
&& reifiedBuilderException.getRootCauseSkyKey().equals(skyKey)) {
- boolean shouldFailFast = !keepGoing || builderException.isCatastrophic();
+ boolean shouldFailFast =
+ !evaluatorContext.keepGoing() || builderException.isCatastrophic();
if (shouldFailFast) {
- // After we commit this error to the graph but before the eval call completes with the
- // error there is a race-like opportunity for the error to be used, either by an
- // in-flight computation or by a future computation.
- if (!visitor.preventNewEvaluations()) {
+ // After we commit this error to the graph but before the doMutatingEvaluation call
+ // completes with the error there is a race-like opportunity for the error to be used,
+ // either by an in-flight computation or by a future computation.
+ if (!evaluatorContext.getVisitor().preventNewEvaluations()) {
// This is not the first error encountered, so we ignore it so that we can terminate
// with the first error.
return;
}
}
- Map<SkyKey, ? extends NodeEntry> newlyRequestedDeps =
- getBatchValues(skyKey, Reason.RDEP_ADDITION, env.newlyRequestedDeps);
+ Map<SkyKey, ? extends NodeEntry> newlyRequestedDeps =
+ evaluatorContext.getBatchValues(
+ skyKey, Reason.RDEP_ADDITION, env.getNewlyRequestedDeps());
boolean isTransitivelyTransient = reifiedBuilderException.isTransient();
- for (NodeEntry depEntry
- : Iterables.concat(env.directDeps.values(), newlyRequestedDeps.values())) {
+ for (NodeEntry depEntry :
+ Iterables.concat(env.getDirectDepsValues(), newlyRequestedDeps.values())) {
if (!isDoneForBuild(depEntry)) {
continue;
}
@@ -1119,7 +435,7 @@ public final class ParallelEvaluator implements Evaluator {
state,
errorInfo,
/*isDirectlyTransient=*/ reifiedBuilderException.isTransient());
- env.commit(state, /*enqueueParents=*/keepGoing);
+ env.commit(state, /*enqueueParents=*/ evaluatorContext.keepGoing());
if (!shouldFailFast) {
return;
}
@@ -1130,21 +446,21 @@ public final class ParallelEvaluator implements Evaluator {
// some context together with the exception.
String msg = prepareCrashMessage(skyKey, state.getInProgressReverseDeps());
RuntimeException ex = new RuntimeException(msg, re);
- visitor.noteCrash(ex);
+ evaluatorContext.getVisitor().noteCrash(ex);
throw ex;
} finally {
env.doneBuilding();
long elapsedTimeNanos = BlazeClock.instance().nanoTime() - startTime;
if (elapsedTimeNanos > 0) {
- if (progressReceiver != null) {
- progressReceiver.computed(skyKey, elapsedTimeNanos);
+ if (evaluatorContext.getProgressReceiver() != null) {
+ evaluatorContext.getProgressReceiver().computed(skyKey, elapsedTimeNanos);
}
Profiler.instance().logSimpleTaskDuration(startTime, elapsedTimeNanos,
ProfilerTask.SKYFUNCTION, skyKey);
}
}
- GroupedListHelper<SkyKey> newDirectDeps = env.newlyRequestedDeps;
+ GroupedListHelper<SkyKey> newDirectDeps = env.getNewlyRequestedDeps();
if (value != null) {
Preconditions.checkState(!env.valuesMissing(), "Evaluation of %s returned non-null value "
@@ -1154,7 +470,7 @@ public final class ParallelEvaluator implements Evaluator {
registerNewlyDiscoveredDepsForDoneEntry(
skyKey,
state,
- graph.getBatch(skyKey, Reason.RDEP_ADDITION, env.newlyRequestedDeps),
+ graph.getBatch(skyKey, Reason.RDEP_ADDITION, env.getNewlyRequestedDeps()),
oldDeps,
env);
env.commit(state, /*enqueueParents=*/true);
@@ -1162,7 +478,8 @@ public final class ParallelEvaluator implements Evaluator {
}
if (env.getDepErrorKey() != null) {
- Preconditions.checkState(!keepGoing, "%s %s %s", skyKey, state, env.getDepErrorKey());
+ Preconditions.checkState(
+ !evaluatorContext.keepGoing(), "%s %s %s", skyKey, state, env.getDepErrorKey());
// We encountered a child error in noKeepGoing mode, so we want to fail fast. But we first
// need to add the edge between the current node and the child error it requested so that
// error bubbling can occur. Note that this edge will subsequently be removed during graph
@@ -1194,7 +511,7 @@ public final class ParallelEvaluator implements Evaluator {
childErrorEntry);
}
ErrorInfo childErrorInfo = Preconditions.checkNotNull(childErrorEntry.getErrorInfo());
- visitor.preventNewEvaluations();
+ evaluatorContext.getVisitor().preventNewEvaluations();
throw SchedulerException.ofError(childErrorInfo, childErrorKey);
}
@@ -1212,14 +529,17 @@ public final class ParallelEvaluator implements Evaluator {
// just order it to be built.
if (newDirectDeps.isEmpty()) {
// TODO(bazel-team): This means a bug in the SkyFunction. What to do?
- Preconditions.checkState(!env.childErrorInfos.isEmpty(),
- "Evaluation of SkyKey failed and no dependencies were requested: %s %s", skyKey, state);
Preconditions.checkState(
- keepGoing,
+ !env.getChildErrorInfos().isEmpty(),
+ "Evaluation of SkyKey failed and no dependencies were requested: %s %s",
+ skyKey,
+ state);
+ Preconditions.checkState(
+ evaluatorContext.keepGoing(),
"nokeep_going evaluation should have failed on first child error: %s %s %s",
skyKey,
state,
- env.childErrorInfos);
+ env.getChildErrorInfos());
// If the child error was catastrophic, committing this parent to the graph is not
// necessary, but since we don't do error bubbling in catastrophes, it doesn't violate any
// invariants either.
@@ -1271,36 +591,6 @@ public final class ParallelEvaluator implements Evaluator {
}
/**
- * Signals all parents that this node is finished. If visitor is not null, also enqueues any
- * parents that are ready. If visitor is null, indicating that we are building this node after the
- * main build aborted, then skip any parents that are already done (that can happen with cycles).
- */
- private void signalValuesAndEnqueueIfReady(
- @Nullable ValueVisitor visitor, SkyKey skyKey, Iterable<SkyKey> keys, Version version)
- throws InterruptedException {
- // No fields of the entry are needed here, since we're just enqueuing for evaluation, but more
- // importantly, these hints are not respected for not-done nodes. If they are, we may need to
- // alter this hint.
- Map<SkyKey, ? extends NodeEntry> batch = graph.getBatch(skyKey, Reason.SIGNAL_DEP, keys);
- if (visitor != null) {
- for (SkyKey key : keys) {
- NodeEntry entry = Preconditions.checkNotNull(batch.get(key), key);
- if (entry.signalDep(version)) {
- visitor.enqueueEvaluation(key);
- }
- }
- } else {
- for (SkyKey key : keys) {
- NodeEntry entry = Preconditions.checkNotNull(batch.get(key), key);
- if (!entry.isDone()) {
- // In cycles, we can have parents that are already done.
- entry.signalDep(version);
- }
- }
- }
- }
-
- /**
* If child is not done, removes {@param inProgressParent} from {@param child}'s reverse deps.
* Returns whether child should be removed from inProgressParent's entry's direct deps.
*/
@@ -1329,14 +619,14 @@ public final class ParallelEvaluator implements Evaluator {
Set<SkyKey> oldDeps,
SkyFunctionEnvironment env) {
Set<SkyKey> unfinishedDeps = new HashSet<>();
- for (SkyKey dep : env.newlyRequestedDeps) {
+ for (SkyKey dep : env.getNewlyRequestedDeps()) {
if (!isDoneForBuild(newlyRequestedDepMap.get(dep))) {
unfinishedDeps.add(dep);
}
}
- env.newlyRequestedDeps.remove(unfinishedDeps);
- entry.addTemporaryDirectDeps(env.newlyRequestedDeps);
- for (SkyKey newDep : env.newlyRequestedDeps) {
+ env.getNewlyRequestedDeps().remove(unfinishedDeps);
+ entry.addTemporaryDirectDeps(env.getNewlyRequestedDeps());
+ for (SkyKey newDep : env.getNewlyRequestedDeps()) {
// Note that this depEntry can't be null. If env.newlyRequestedDeps contained a key with a
// null entry, then it would have been added to unfinishedDeps and then removed from
// env.newlyRequestedDeps just above this loop.
@@ -1350,24 +640,33 @@ public final class ParallelEvaluator implements Evaluator {
newDep, skyKey, entry, depEntry);
entry.signalDep();
}
- Preconditions.checkState(entry.isReady(), "%s %s %s", skyKey, entry, env.newlyRequestedDeps);
+ Preconditions.checkState(
+ entry.isReady(), "%s %s %s", skyKey, entry, env.getNewlyRequestedDeps());
}
private void informProgressReceiverThatValueIsDone(SkyKey key, NodeEntry entry)
throws InterruptedException {
- if (progressReceiver != null) {
+ if (evaluatorContext.getProgressReceiver() != null) {
Preconditions.checkState(entry.isDone(), entry);
SkyValue value = entry.getValue();
Version valueVersion = entry.getVersion();
- Preconditions.checkState(valueVersion.atMost(graphVersion),
- "%s should be at most %s in the version partial ordering", valueVersion, graphVersion);
+ Preconditions.checkState(
+ valueVersion.atMost(evaluatorContext.getGraphVersion()),
+ "%s should be at most %s in the version partial ordering",
+ valueVersion,
+ evaluatorContext.getGraphVersion());
// For most nodes we do not inform the progress receiver if they were already done when we
// retrieve them, but top-level nodes are presumably of more interest.
// If valueVersion is not equal to graphVersion, it must be less than it (by the
// Preconditions check above), and so the node is clean.
- progressReceiver.evaluated(key, Suppliers.ofInstance(value), valueVersion.equals(graphVersion)
- ? EvaluationState.BUILT
- : EvaluationState.CLEAN);
+ evaluatorContext
+ .getProgressReceiver()
+ .evaluated(
+ key,
+ Suppliers.ofInstance(value),
+ valueVersion.equals(evaluatorContext.getGraphVersion())
+ ? EvaluationState.BUILT
+ : EvaluationState.CLEAN);
}
}
@@ -1382,7 +681,7 @@ public final class ParallelEvaluator implements Evaluator {
// Inform progressReceiver that these nodes are done to be consistent with the main code path.
boolean allAreDone = true;
Map<SkyKey, ? extends NodeEntry> batch =
- getBatchValues(null, Reason.PRE_OR_POST_EVALUATION, skyKeySet);
+ evaluatorContext.getBatchValues(null, Reason.PRE_OR_POST_EVALUATION, skyKeySet);
for (SkyKey key : skyKeySet) {
if (!isDoneForBuild(batch.get(key))) {
allAreDone = false;
@@ -1395,10 +694,10 @@ public final class ParallelEvaluator implements Evaluator {
}
// Note that the 'catastrophe' parameter doesn't really matter here (it's only used for
// sanity checking).
- return constructResult(null, skyKeySet, null, /*catastrophe=*/false);
+ return constructResult(skyKeySet, null, /*catastrophe=*/ false);
}
- if (!keepGoing) {
+ if (!evaluatorContext.keepGoing()) {
Set<SkyKey> cachedErrorKeys = new HashSet<>();
for (SkyKey skyKey : skyKeySet) {
NodeEntry entry = graph.get(null, Reason.PRE_OR_POST_EVALUATION, skyKey);
@@ -1415,7 +714,7 @@ public final class ParallelEvaluator implements Evaluator {
if (!cachedErrorKeys.isEmpty()) {
// Note that the 'catastrophe' parameter doesn't really matter here (it's only used for
// sanity checking).
- return constructResult(null, cachedErrorKeys, null, /*catastrophe=*/false);
+ return constructResult(cachedErrorKeys, null, /*catastrophe=*/ false);
}
}
@@ -1423,7 +722,7 @@ public final class ParallelEvaluator implements Evaluator {
// and !keepsEdges are incompatible only in the case of a failed evaluation -- there is no
// need to be overly harsh to callers who are just trying to retrieve a cached result.
Preconditions.checkState(
- keepGoing
+ evaluatorContext.keepGoing()
|| !(graph instanceof InMemoryGraphImpl)
|| ((InMemoryGraphImpl) graph).keepsEdges(),
"nokeep_going evaluations are not allowed if graph edges are not kept: %s",
@@ -1431,17 +730,15 @@ public final class ParallelEvaluator implements Evaluator {
Profiler.instance().startTask(ProfilerTask.SKYFRAME_EVAL, skyKeySet);
try {
- ValueVisitor valueVisitor =
- forkJoinPool == null ? new ValueVisitor(threadCount) : new ValueVisitor(forkJoinPool);
- return eval(skyKeySet, valueVisitor);
+ return doMutatingEvaluation(skyKeySet);
} finally {
Profiler.instance().completeTask(ProfilerTask.SKYFRAME_EVAL);
}
}
@ThreadCompatible
- private <T extends SkyValue> EvaluationResult<T> eval(ImmutableSet<SkyKey> skyKeys,
- ValueVisitor visitor) throws InterruptedException {
+ private <T extends SkyValue> EvaluationResult<T> doMutatingEvaluation(
+ ImmutableSet<SkyKey> skyKeys) throws InterruptedException {
// We unconditionally add the ErrorTransienceValue here, to ensure that it will be created, and
// in the graph, by the time that it is needed. Creating it on demand in a parallel context sets
// up a race condition, because there is no way to atomically create a node and set its value.
@@ -1452,7 +749,7 @@ public final class ParallelEvaluator implements Evaluator {
if (!errorTransienceEntry.isDone()) {
injectValues(
ImmutableMap.of(ErrorTransienceValue.KEY, (SkyValue) ErrorTransienceValue.INSTANCE),
- graphVersion,
+ evaluatorContext.getGraphVersion(),
graph,
dirtyKeyTracker);
}
@@ -1463,7 +760,7 @@ public final class ParallelEvaluator implements Evaluator {
// This must be equivalent to the code in enqueueChild above, in order to be thread-safe.
switch (entry.addReverseDepAndCheckIfDone(null)) {
case NEEDS_SCHEDULING:
- visitor.enqueueEvaluation(skyKey);
+ evaluatorContext.getVisitor().enqueueEvaluation(skyKey);
break;
case DONE:
informProgressReceiverThatValueIsDone(skyKey, entry);
@@ -1475,22 +772,24 @@ public final class ParallelEvaluator implements Evaluator {
}
}
try {
- return waitForCompletionAndConstructResult(visitor, skyKeys);
+ return waitForCompletionAndConstructResult(skyKeys);
} finally {
- inflightKeysReceiver.accept(visitor.inflightNodes);
+ inflightKeysReceiver.accept(evaluatorContext.getVisitor().getInflightNodes());
}
}
private <T extends SkyValue> EvaluationResult<T> waitForCompletionAndConstructResult(
- ValueVisitor visitor, Iterable<SkyKey> skyKeys) throws InterruptedException {
+ Iterable<SkyKey> skyKeys) throws InterruptedException {
Map<SkyKey, ValueWithMetadata> bubbleErrorInfo = null;
boolean catastrophe = false;
try {
- visitor.waitForCompletion();
+ evaluatorContext.getVisitor().waitForCompletion();
} catch (final SchedulerException e) {
- if (!visitor.getCrashes().isEmpty()) {
- reporter.handle(Event.error("Crashes detected: " + visitor.getCrashes()));
- throw Iterables.getFirst(visitor.getCrashes(), null);
+ if (!evaluatorContext.getVisitor().getCrashes().isEmpty()) {
+ evaluatorContext
+ .getReporter()
+ .handle(Event.error("Crashes detected: " + evaluatorContext.getVisitor().getCrashes()));
+ throw Iterables.getFirst(evaluatorContext.getVisitor().getCrashes(), null);
}
Throwables.propagateIfPossible(e.getCause(), InterruptedException.class);
if (Thread.interrupted()) {
@@ -1505,8 +804,8 @@ public final class ParallelEvaluator implements Evaluator {
// ErrorInfo could only be null if SchedulerException wrapped an InterruptedException, but
// that should have been propagated.
ErrorInfo errorInfo = Preconditions.checkNotNull(e.getErrorInfo(), errorKey);
- if (!keepGoing) {
- bubbleErrorInfo = bubbleErrorUp(errorInfo, errorKey, skyKeys, visitor);
+ if (!evaluatorContext.keepGoing()) {
+ bubbleErrorInfo = bubbleErrorUp(errorInfo, errorKey, skyKeys);
} else {
Preconditions.checkState(
errorInfo.isCatastrophic(),
@@ -1524,11 +823,13 @@ public final class ParallelEvaluator implements Evaluator {
graph.get(null, Reason.ERROR_BUBBLING, errorKey).getValueMaybeWithMetadata()));
}
}
- Preconditions.checkState(visitor.getCrashes().isEmpty(), visitor.getCrashes());
+ Preconditions.checkState(
+ evaluatorContext.getVisitor().getCrashes().isEmpty(),
+ evaluatorContext.getVisitor().getCrashes());
// Successful evaluation, either because keepGoing or because we actually did succeed.
// TODO(bazel-team): Maybe report root causes during the build for lower latency.
- return constructResult(visitor, skyKeys, bubbleErrorInfo, catastrophe);
+ return constructResult(skyKeys, bubbleErrorInfo, catastrophe);
}
/**
@@ -1570,7 +871,7 @@ public final class ParallelEvaluator implements Evaluator {
* because during this process we can add useful information about error from other nodes.
*/
private Map<SkyKey, ValueWithMetadata> bubbleErrorUp(
- final ErrorInfo leafFailure, SkyKey errorKey, Iterable<SkyKey> skyKeys, ValueVisitor visitor)
+ final ErrorInfo leafFailure, SkyKey errorKey, Iterable<SkyKey> skyKeys)
throws InterruptedException {
Set<SkyKey> rootValues = ImmutableSet.copyOf(skyKeys);
ErrorInfo error = leafFailure;
@@ -1609,23 +910,35 @@ public final class ParallelEvaluator implements Evaluator {
Preconditions.checkState(errorEntry.isDone(), "%s %s", errorEntry, bubbleParentEntry);
Version parentVersion = bubbleParentEntry.getVersion();
Version childVersion = errorEntry.getVersion();
- Preconditions.checkState(childVersion.atMost(graphVersion)
- && !childVersion.equals(graphVersion),
+ Preconditions.checkState(
+ childVersion.atMost(evaluatorContext.getGraphVersion())
+ && !childVersion.equals(evaluatorContext.getGraphVersion()),
"child entry is not older than the current graph version, but had a done parent. "
- + "child: %s childEntry: %s, childVersion: %s"
- + "bubbleParent: %s bubbleParentEntry: %s, parentVersion: %s, graphVersion: %s",
- errorKey, errorEntry, childVersion,
- bubbleParent, bubbleParentEntry, parentVersion, graphVersion);
- Preconditions.checkState(parentVersion.atMost(graphVersion)
- && !parentVersion.equals(graphVersion),
+ + "child: %s childEntry: %s, childVersion: %s"
+ + "bubbleParent: %s bubbleParentEntry: %s, parentVersion: %s, graphVersion: %s",
+ errorKey,
+ errorEntry,
+ childVersion,
+ bubbleParent,
+ bubbleParentEntry,
+ parentVersion,
+ evaluatorContext.getGraphVersion());
+ Preconditions.checkState(
+ parentVersion.atMost(evaluatorContext.getGraphVersion())
+ && !parentVersion.equals(evaluatorContext.getGraphVersion()),
"parent entry is not older than the current graph version. "
- + "child: %s childEntry: %s, childVersion: %s"
- + "bubbleParent: %s bubbleParentEntry: %s, parentVersion: %s, graphVersion: %s",
- errorKey, errorEntry, childVersion,
- bubbleParent, bubbleParentEntry, parentVersion, graphVersion);
+ + "child: %s childEntry: %s, childVersion: %s"
+ + "bubbleParent: %s bubbleParentEntry: %s, parentVersion: %s, graphVersion: %s",
+ errorKey,
+ errorEntry,
+ childVersion,
+ bubbleParent,
+ bubbleParentEntry,
+ parentVersion,
+ evaluatorContext.getGraphVersion());
continue;
}
- if (visitor.isInflight(bubbleParent)
+ if (evaluatorContext.getVisitor().isInflight(bubbleParent)
&& bubbleParentEntry.getTemporaryDirectDeps().expensiveContains(errorKey)) {
// Only bubble up to parent if it's part of this build. If this node was dirtied and
// re-evaluated, but in a build without this parent, we may try to bubble up to that
@@ -1648,7 +961,7 @@ public final class ParallelEvaluator implements Evaluator {
}
Preconditions.checkNotNull(parentEntry, "%s %s", errorKey, parent);
errorKey = parent;
- SkyFunction factory = skyFunctions.get(parent.functionName());
+ SkyFunction factory = evaluatorContext.getSkyFunctions().get(parent.functionName());
if (parentEntry.isDirty()) {
switch (parentEntry.getDirtyState()) {
case CHECK_DEPENDENCIES:
@@ -1671,7 +984,7 @@ public final class ParallelEvaluator implements Evaluator {
new GroupedList<SkyKey>(),
bubbleErrorInfo,
ImmutableSet.<SkyKey>of(),
- visitor);
+ evaluatorContext);
externalInterrupt = externalInterrupt || Thread.currentThread().isInterrupted();
try {
// This build is only to check if the parent node can give us a better error. We don't
@@ -1723,17 +1036,16 @@ public final class ParallelEvaluator implements Evaluator {
* {@code skyKeys} are known to be in the DONE state ({@code entry.isDone()} returns true).
*/
private <T extends SkyValue> EvaluationResult<T> constructResult(
- @Nullable ValueVisitor visitor,
Iterable<SkyKey> skyKeys,
@Nullable Map<SkyKey, ValueWithMetadata> bubbleErrorInfo,
boolean catastrophe)
throws InterruptedException {
Preconditions.checkState(
- catastrophe == (keepGoing && bubbleErrorInfo != null),
+ catastrophe == (evaluatorContext.keepGoing() && bubbleErrorInfo != null),
"Catastrophe not consistent with keepGoing mode and bubbleErrorInfo: %s %s %s %s",
skyKeys,
catastrophe,
- keepGoing,
+ evaluatorContext.keepGoing(),
bubbleErrorInfo);
EvaluationResult.Builder<T> result = EvaluationResult.builder();
List<SkyKey> cycleRoots = new ArrayList<>();
@@ -1758,10 +1070,12 @@ public final class ParallelEvaluator implements Evaluator {
// modes [skyframe-core]
// Note that replaying events here is only necessary on null builds, because otherwise we
// would have already printed the transitive messages after building these values.
- replayingNestedSetEventVisitor.visit(valueWithMetadata.getTransitiveEvents());
+ evaluatorContext
+ .getReplayingNestedSetEventVisitor()
+ .visit(valueWithMetadata.getTransitiveEvents());
ErrorInfo errorInfo = valueWithMetadata.getErrorInfo();
Preconditions.checkState(value != null || errorInfo != null, skyKey);
- if (!keepGoing && errorInfo != null) {
+ if (!evaluatorContext.keepGoing() && errorInfo != null) {
// value will be null here unless the value was already built on a prior keepGoing build.
result.addError(skyKey, errorInfo);
continue;
@@ -1776,8 +1090,7 @@ public final class ParallelEvaluator implements Evaluator {
}
}
if (!cycleRoots.isEmpty()) {
- Preconditions.checkState(visitor != null, skyKeys);
- checkForCycles(cycleRoots, result, visitor, keepGoing);
+ checkForCycles(cycleRoots, result);
}
if (catastrophe) {
// We may not have a top-level node completed. Inform the caller of the catastrophic exception
@@ -1804,24 +1117,20 @@ public final class ParallelEvaluator implements Evaluator {
}
private <T extends SkyValue> void checkForCycles(
- Iterable<SkyKey> badRoots,
- EvaluationResult.Builder<T> result,
- final ValueVisitor visitor,
- boolean keepGoing)
- throws InterruptedException {
+ Iterable<SkyKey> badRoots, EvaluationResult.Builder<T> result) throws InterruptedException {
try (AutoProfiler p = AutoProfiler.logged("Checking for Skyframe cycles", LOG, 10)) {
for (SkyKey root : badRoots) {
- ErrorInfo errorInfo = checkForCycles(root, visitor, keepGoing);
+ ErrorInfo errorInfo = checkForCycles(root);
if (errorInfo == null) {
// This node just wasn't finished when evaluation aborted -- there were no cycles below
// it.
- Preconditions.checkState(!keepGoing, "", root, badRoots);
+ Preconditions.checkState(!evaluatorContext.keepGoing(), "", root, badRoots);
continue;
}
Preconditions.checkState(!Iterables.isEmpty(errorInfo.getCycleInfo()),
"%s was not evaluated, but was not part of a cycle", root);
result.addError(root, errorInfo);
- if (!keepGoing) {
+ if (!evaluatorContext.keepGoing()) {
return;
}
}
@@ -1847,8 +1156,7 @@ public final class ParallelEvaluator implements Evaluator {
* continue. Once all of a node's children are done, we construct an error value for it, based on
* those children. Finally, when the original root's node is constructed, we return its ErrorInfo.
*/
- private ErrorInfo checkForCycles(SkyKey root, ValueVisitor visitor, boolean keepGoing)
- throws InterruptedException {
+ private ErrorInfo checkForCycles(SkyKey root) throws InterruptedException {
// The number of cycles found. Do not keep on searching for more cycles after this many were
// found.
int cyclesFound = 0;
@@ -1883,7 +1191,7 @@ public final class ParallelEvaluator implements Evaluator {
if (entry.isDone()) {
continue;
}
- if (!keepGoing) {
+ if (!evaluatorContext.keepGoing()) {
// in the --nokeep_going mode, we would have already returned if we'd found a cycle below
// this node. The fact that we haven't means that there were no cycles below this node
// -- it just hadn't finished evaluating. So skip it.
@@ -1911,7 +1219,7 @@ public final class ParallelEvaluator implements Evaluator {
key,
directDeps,
Sets.difference(entry.getAllRemainingDirtyDirectDeps(), removedDeps),
- visitor);
+ evaluatorContext);
env.setError(
entry, ErrorInfo.fromChildErrors(key, errorDeps), /*isDirectlyTransient=*/false);
env.commit(entry, /*enqueueParents=*/false);
@@ -1944,7 +1252,7 @@ public final class ParallelEvaluator implements Evaluator {
entry.signalDep();
maybeMarkRebuilding(entry);
}
- if (keepGoing) {
+ if (evaluatorContext.keepGoing()) {
// Any children of this node that we haven't already visited are not worth visiting,
// since this node is about to be done. Thus, the only child worth visiting is the one in
// this cycle, the cycleChild (which may == key if this cycle is a self-edge).
@@ -1961,7 +1269,7 @@ public final class ParallelEvaluator implements Evaluator {
entry.getTemporaryDirectDeps(),
ImmutableMap.of(cycleChild, dummyValue),
Sets.difference(entry.getAllRemainingDirtyDirectDeps(), removedDeps),
- visitor);
+ evaluatorContext);
// Construct error info for this node. Get errors from children, which are all done
// except possibly for the cycleChild.
@@ -2013,7 +1321,7 @@ public final class ParallelEvaluator implements Evaluator {
toVisit.push(nextValue);
}
}
- return keepGoing ? getAndCheckDoneForCycle(root).getErrorInfo() : null;
+ return evaluatorContext.keepGoing() ? getAndCheckDoneForCycle(root).getErrorInfo() : null;
}
/**
@@ -2056,7 +1364,7 @@ public final class ParallelEvaluator implements Evaluator {
Iterable<SkyKey> children, SkyKey unfinishedChild) throws InterruptedException {
List<ErrorInfo> allErrors = new ArrayList<>();
Set<? extends Entry<SkyKey, ? extends NodeEntry>> childEntries =
- getBatchValues(null, Reason.CYCLE_CHECKING, children).entrySet();
+ evaluatorContext.getBatchValues(null, Reason.CYCLE_CHECKING, children).entrySet();
for (Entry<SkyKey, ? extends NodeEntry> childMapEntry : childEntries) {
SkyKey childKey = childMapEntry.getKey();
NodeEntry childNodeEntry = childMapEntry.getValue();
@@ -2163,17 +1471,16 @@ public final class ParallelEvaluator implements Evaluator {
private Map<SkyKey, ? extends NodeEntry> getAndCheckDoneBatchForCycle(
SkyKey parent, Iterable<SkyKey> keys) throws InterruptedException {
- Map<SkyKey, ? extends NodeEntry> nodes = getBatchValues(parent, Reason.CYCLE_CHECKING, keys);
+ Map<SkyKey, ? extends NodeEntry> nodes =
+ evaluatorContext.getBatchValues(parent, Reason.CYCLE_CHECKING, keys);
for (Entry<SkyKey, ? extends NodeEntry> nodeEntryMapEntry : nodes.entrySet()) {
checkDone(nodeEntryMapEntry.getKey(), nodeEntryMapEntry.getValue());
}
return nodes;
}
- private static final SkyValue NULL_MARKER = new SkyValue() {};
-
@Nullable
- private static SkyValue maybeGetValueFromError(
+ static SkyValue maybeGetValueFromError(
SkyKey key,
@Nullable NodeEntry entry,
@Nullable Map<SkyKey, ValueWithMetadata> bubbleErrorInfo)
@@ -2185,16 +1492,12 @@ public final class ParallelEvaluator implements Evaluator {
return isDoneForBuild(entry) ? entry.getValueMaybeWithMetadata() : null;
}
- private static SkyValue getValueOrNullMarker(@Nullable NodeEntry nodeEntry)
- throws InterruptedException {
- return isDoneForBuild(nodeEntry) ? nodeEntry.getValueMaybeWithMetadata() : NULL_MARKER;
- }
/**
* Return true if the entry does not need to be re-evaluated this build. The entry will need to be
* re-evaluated if it is not done, but also if it was not completely evaluated last build and this
* build is keepGoing.
*/
- private static boolean isDoneForBuild(@Nullable NodeEntry entry) {
+ static boolean isDoneForBuild(@Nullable NodeEntry entry) {
return entry != null && entry.isDone();
}
diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java
new file mode 100644
index 0000000000..605c1a6566
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluatorContext.java
@@ -0,0 +1,216 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.skyframe;
+
+import static com.google.devtools.build.skyframe.QueryableGraph.Reason;
+
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.collect.ImmutableMap;
+import com.google.devtools.build.lib.collect.nestedset.NestedSetVisitor;
+import com.google.devtools.build.lib.events.Event;
+import com.google.devtools.build.lib.events.EventHandler;
+import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.build.skyframe.MemoizingEvaluator.EmittedEventState;
+import java.util.Map;
+import java.util.concurrent.ForkJoinPool;
+import javax.annotation.Nullable;
+
+/**
+ * Context object holding sufficient information for {@link SkyFunctionEnvironment} to perform its
+ * duties. Shared among all {@link SkyFunctionEnvironment} instances, which should regard this
+ * object as a read-only collection of data.
+ */
+class ParallelEvaluatorContext {
+ private final QueryableGraph graph;
+ private final Version graphVersion;
+ private final ImmutableMap<SkyFunctionName, ? extends SkyFunction> skyFunctions;
+ private final EventHandler reporter;
+ private final NestedSetVisitor<TaggedEvents> replayingNestedSetEventVisitor;
+ private final boolean keepGoing;
+ private final boolean storeErrorsAlongsideValues;
+ @Nullable private final EvaluationProgressReceiver progressReceiver;
+ private final EventFilter storedEventFilter;
+ /**
+ * The visitor managing the thread pool. Used to enqueue parents when an entry is finished, and,
+ * during testing, to block until an exception is thrown if a node builder requests that.
+ * Initialized after construction to avoid the overhead of the caller's creating a threadpool in
+ * cases where it is not needed.
+ */
+ private final Supplier<NodeEntryVisitor> visitorSupplier;
+
+ ParallelEvaluatorContext(
+ QueryableGraph graph,
+ Version graphVersion,
+ ImmutableMap<SkyFunctionName, ? extends SkyFunction> skyFunctions,
+ EventHandler reporter,
+ EmittedEventState emittedEventState,
+ boolean keepGoing,
+ boolean storeErrorsAlongsideValues,
+ final EvaluationProgressReceiver progressReceiver,
+ EventFilter storedEventFilter,
+ final DirtyKeyTracker dirtyKeyTracker,
+ final Function<SkyKey, Runnable> runnableMaker,
+ final int threadCount) {
+ this.graph = graph;
+ this.graphVersion = graphVersion;
+ this.skyFunctions = skyFunctions;
+ this.reporter = reporter;
+ this.replayingNestedSetEventVisitor =
+ new NestedSetVisitor<>(new NestedSetEventReceiver(reporter), emittedEventState);
+ this.keepGoing = keepGoing;
+ this.storeErrorsAlongsideValues = storeErrorsAlongsideValues;
+ this.progressReceiver = progressReceiver;
+ this.storedEventFilter = storedEventFilter;
+ visitorSupplier =
+ Suppliers.memoize(
+ new Supplier<NodeEntryVisitor>() {
+ @Override
+ public NodeEntryVisitor get() {
+ return new NodeEntryVisitor(
+ threadCount, dirtyKeyTracker, progressReceiver, runnableMaker);
+ }
+ });
+ }
+
+ ParallelEvaluatorContext(
+ QueryableGraph graph,
+ Version graphVersion,
+ ImmutableMap<SkyFunctionName, ? extends SkyFunction> skyFunctions,
+ EventHandler reporter,
+ EmittedEventState emittedEventState,
+ boolean keepGoing,
+ boolean storeErrorsAlongsideValues,
+ final EvaluationProgressReceiver progressReceiver,
+ EventFilter storedEventFilter,
+ final DirtyKeyTracker dirtyKeyTracker,
+ final Function<SkyKey, Runnable> runnableMaker,
+ final ForkJoinPool forkJoinPool) {
+ this.graph = graph;
+ this.graphVersion = graphVersion;
+ this.skyFunctions = skyFunctions;
+ this.reporter = reporter;
+ this.replayingNestedSetEventVisitor =
+ new NestedSetVisitor<>(new NestedSetEventReceiver(reporter), emittedEventState);
+ this.keepGoing = keepGoing;
+ this.storeErrorsAlongsideValues = storeErrorsAlongsideValues;
+ this.progressReceiver = progressReceiver;
+ this.storedEventFilter = storedEventFilter;
+ visitorSupplier =
+ Suppliers.memoize(
+ new Supplier<NodeEntryVisitor>() {
+ @Override
+ public NodeEntryVisitor get() {
+ return new NodeEntryVisitor(
+ forkJoinPool, dirtyKeyTracker, progressReceiver, runnableMaker);
+ }
+ });
+ }
+
+ Map<SkyKey, ? extends NodeEntry> getBatchValues(
+ @Nullable SkyKey parent, Reason reason, Iterable<SkyKey> keys) throws InterruptedException {
+ return graph.getBatch(parent, reason, keys);
+ }
+
+ /**
+ * Signals all parents that this node is finished. If {@code enqueueParents} is true, also
+ * enqueues any parents that are ready. Otherwise, this indicates that we are building this node
+ * after the main build aborted, so skip any parents that are already done (that can happen with
+ * cycles).
+ */
+ void signalValuesAndEnqueueIfReady(
+ SkyKey skyKey, Iterable<SkyKey> keys, Version version, boolean enqueueParents)
+ throws InterruptedException {
+ // No fields of the entry are needed here, since we're just enqueuing for evaluation, but more
+ // importantly, these hints are not respected for not-done nodes. If they are, we may need to
+ // alter this hint.
+ Map<SkyKey, ? extends NodeEntry> batch = graph.getBatch(skyKey, Reason.SIGNAL_DEP, keys);
+ if (enqueueParents) {
+ for (SkyKey key : keys) {
+ NodeEntry entry = Preconditions.checkNotNull(batch.get(key), key);
+ if (entry.signalDep(version)) {
+ getVisitor().enqueueEvaluation(key);
+ }
+ }
+ } else {
+ for (SkyKey key : keys) {
+ NodeEntry entry = Preconditions.checkNotNull(batch.get(key), key);
+ if (!entry.isDone()) {
+ // In cycles, we can have parents that are already done.
+ entry.signalDep(version);
+ }
+ }
+ }
+ }
+
+ QueryableGraph getGraph() {
+ return graph;
+ }
+
+ Version getGraphVersion() {
+ return graphVersion;
+ }
+
+ boolean keepGoing() {
+ return keepGoing;
+ }
+
+ NodeEntryVisitor getVisitor() {
+ return visitorSupplier.get();
+ }
+
+ @Nullable
+ EvaluationProgressReceiver getProgressReceiver() {
+ return progressReceiver;
+ }
+
+ NestedSetVisitor<TaggedEvents> getReplayingNestedSetEventVisitor() {
+ return replayingNestedSetEventVisitor;
+ }
+
+ EventHandler getReporter() {
+ return reporter;
+ }
+
+ ImmutableMap<SkyFunctionName, ? extends SkyFunction> getSkyFunctions() {
+ return skyFunctions;
+ }
+
+ EventFilter getStoredEventFilter() {
+ return storedEventFilter;
+ }
+
+ boolean storeErrorsAlongsideValues() {
+ return storeErrorsAlongsideValues;
+ }
+
+ /** Receives the events from the NestedSet and delegates to the reporter. */
+ private static class NestedSetEventReceiver implements NestedSetVisitor.Receiver<TaggedEvents> {
+
+ private final EventHandler reporter;
+
+ public NestedSetEventReceiver(EventHandler reporter) {
+ this.reporter = reporter;
+ }
+
+ @Override
+ public void accept(TaggedEvents events) {
+ String tag = events.getTag();
+ for (Event e : events.getEvents()) {
+ reporter.handle(e.withTag(tag));
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/skyframe/SkyFunctionEnvironment.java b/src/main/java/com/google/devtools/build/skyframe/SkyFunctionEnvironment.java
new file mode 100644
index 0000000000..ec84b1b33e
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/skyframe/SkyFunctionEnvironment.java
@@ -0,0 +1,609 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.skyframe;
+
+import static com.google.devtools.build.skyframe.EvaluationProgressReceiver.EvaluationState;
+import static com.google.devtools.build.skyframe.NodeEntry.DependencyState;
+import static com.google.devtools.build.skyframe.ParallelEvaluator.isDoneForBuild;
+import static com.google.devtools.build.skyframe.ParallelEvaluator.maybeGetValueFromError;
+import static com.google.devtools.build.skyframe.QueryableGraph.Reason;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.devtools.build.lib.collect.nestedset.NestedSet;
+import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder;
+import com.google.devtools.build.lib.events.Event;
+import com.google.devtools.build.lib.events.EventHandler;
+import com.google.devtools.build.lib.events.StoredEventHandler;
+import com.google.devtools.build.lib.util.GroupedList;
+import com.google.devtools.build.lib.util.GroupedList.GroupedListHelper;
+import com.google.devtools.build.lib.util.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import javax.annotation.Nullable;
+
+/** A {@link SkyFunction.Environment} implementation for {@link ParallelEvaluator}. */
+class SkyFunctionEnvironment extends AbstractSkyFunctionEnvironment {
+ private static final SkyValue NULL_MARKER = new SkyValue() {};
+ private static final boolean PREFETCH_OLD_DEPS =
+ Boolean.parseBoolean(
+ System.getProperty("skyframe.ParallelEvaluator.PrefetchOldDeps", "true"));
+
+ private boolean building = true;
+ private SkyKey depErrorKey = null;
+ private final SkyKey skyKey;
+ /**
+ * The deps requested during the previous build of this node. Used for two reasons: (1) They are
+ * fetched eagerly before the node is built, to potentially prime the graph and speed up requests
+ * for them during evaluation. (2) When the node finishes building, any deps from the previous
+ * build that are not deps from this build must have this node removed from them as a reverse dep.
+ * Thus, it is important that all nodes in this set have the property that they have this node as
+ * a reverse dep from the last build, but that this node has not added them as a reverse dep on
+ * this build. That set is normally {@link NodeEntry#getAllRemainingDirtyDirectDeps()}, but in
+ * certain corner cases, like cycles, further filtering may be needed.
+ */
+ private final Set<SkyKey> oldDeps;
+
+ private SkyValue value = null;
+ private ErrorInfo errorInfo = null;
+ private final Map<SkyKey, ValueWithMetadata> bubbleErrorInfo;
+ /** The values previously declared as dependencies. */
+ private final Map<SkyKey, NodeEntry> directDeps;
+
+ /**
+ * The grouped list of values requested during this build as dependencies. On a subsequent build,
+ * if this value is dirty, all deps in the same dependency group can be checked in parallel for
+ * changes. In other words, if dep1 and dep2 are in the same group, then dep1 will be checked in
+ * parallel with dep2. See {@link #getValues} for more.
+ */
+ private final GroupedListHelper<SkyKey> newlyRequestedDeps = new GroupedListHelper<>();
+
+ /** The set of errors encountered while fetching children. */
+ private final Collection<ErrorInfo> childErrorInfos = new LinkedHashSet<>();
+
+ private final StoredEventHandler eventHandler =
+ new StoredEventHandler() {
+ @Override
+ @SuppressWarnings("UnsynchronizedOverridesSynchronized") // only delegates to thread-safe.
+ public void handle(Event e) {
+ checkActive();
+ if (evaluatorContext.getStoredEventFilter().apply(e)) {
+ super.handle(e);
+ } else {
+ evaluatorContext.getReporter().handle(e);
+ }
+ }
+ };
+ private final ParallelEvaluatorContext evaluatorContext;
+
+ SkyFunctionEnvironment(
+ SkyKey skyKey,
+ GroupedList<SkyKey> directDeps,
+ Set<SkyKey> oldDeps,
+ ParallelEvaluatorContext evaluatorContext)
+ throws InterruptedException {
+ this(skyKey, directDeps, null, oldDeps, evaluatorContext);
+ }
+
+ SkyFunctionEnvironment(
+ SkyKey skyKey,
+ GroupedList<SkyKey> directDeps,
+ @Nullable Map<SkyKey, ValueWithMetadata> bubbleErrorInfo,
+ Set<SkyKey> oldDeps,
+ ParallelEvaluatorContext evaluatorContext)
+ throws InterruptedException {
+ this.skyKey = skyKey;
+ this.oldDeps = oldDeps;
+ this.evaluatorContext = evaluatorContext;
+ this.directDeps =
+ Collections.<SkyKey, NodeEntry>unmodifiableMap(
+ batchPrefetch(
+ skyKey, directDeps, oldDeps, /*assertDone=*/ bubbleErrorInfo == null, skyKey));
+ this.bubbleErrorInfo = bubbleErrorInfo;
+ Preconditions.checkState(
+ !this.directDeps.containsKey(ErrorTransienceValue.KEY),
+ "%s cannot have a dep on ErrorTransienceValue during building",
+ skyKey);
+ }
+
+ private Map<SkyKey, ? extends NodeEntry> batchPrefetch(
+ SkyKey requestor,
+ GroupedList<SkyKey> depKeys,
+ Set<SkyKey> oldDeps,
+ boolean assertDone,
+ SkyKey keyForDebugging)
+ throws InterruptedException {
+ Iterable<SkyKey> depKeysAsIterable = Iterables.concat(depKeys);
+ Iterable<SkyKey> keysToPrefetch = depKeysAsIterable;
+ if (PREFETCH_OLD_DEPS) {
+ ImmutableSet.Builder<SkyKey> keysToPrefetchBuilder = ImmutableSet.builder();
+ keysToPrefetchBuilder.addAll(depKeysAsIterable).addAll(oldDeps);
+ keysToPrefetch = keysToPrefetchBuilder.build();
+ }
+ Map<SkyKey, ? extends NodeEntry> batchMap =
+ evaluatorContext.getBatchValues(requestor, Reason.PREFETCH, keysToPrefetch);
+ if (PREFETCH_OLD_DEPS) {
+ batchMap =
+ ImmutableMap.<SkyKey, NodeEntry>copyOf(
+ Maps.filterKeys(batchMap, Predicates.in(ImmutableSet.copyOf(depKeysAsIterable))));
+ }
+ if (batchMap.size() != depKeys.numElements()) {
+ throw new IllegalStateException(
+ "Missing keys for "
+ + keyForDebugging
+ + ": "
+ + Sets.difference(depKeys.toSet(), batchMap.keySet()));
+ }
+ if (assertDone) {
+ for (Map.Entry<SkyKey, ? extends NodeEntry> entry : batchMap.entrySet()) {
+ Preconditions.checkState(
+ entry.getValue().isDone(), "%s had not done %s", keyForDebugging, entry);
+ }
+ }
+ return batchMap;
+ }
+
+ private void checkActive() {
+ Preconditions.checkState(building, skyKey);
+ }
+
+ NestedSet<TaggedEvents> buildEvents(NodeEntry entry, boolean missingChildren)
+ throws InterruptedException {
+ // Aggregate the nested set of events from the direct deps, also adding the events from
+ // building this value.
+ NestedSetBuilder<TaggedEvents> eventBuilder = NestedSetBuilder.stableOrder();
+ ImmutableList<Event> events = eventHandler.getEvents();
+ if (!events.isEmpty()) {
+ eventBuilder.add(new TaggedEvents(getTagFromKey(), events));
+ }
+ if (evaluatorContext.getStoredEventFilter().storeEvents()) {
+ // Only do the work of processing children if we're going to store events.
+ GroupedList<SkyKey> depKeys = entry.getTemporaryDirectDeps();
+ Collection<SkyValue> deps = getDepValuesForDoneNodeMaybeFromError(depKeys);
+ if (!missingChildren && depKeys.numElements() != deps.size()) {
+ throw new IllegalStateException(
+ "Missing keys for "
+ + skyKey
+ + ". Present values: "
+ + deps
+ + "requested from: "
+ + depKeys
+ + ", "
+ + entry);
+ }
+ for (SkyValue value : deps) {
+ eventBuilder.addTransitive(ValueWithMetadata.getEvents(value));
+ }
+ }
+ return eventBuilder.build();
+ }
+
+ /**
+ * If this node has an error, that is, if errorInfo is non-null, do nothing. Otherwise, set
+ * errorInfo to the union of the child errors that were recorded earlier by getValueOrException,
+ * if there are any.
+ *
+ * <p>Child errors are remembered, if there are any and yet the parent recovered without error, so
+ * that subsequent noKeepGoing evaluations can stop as soon as they encounter a node whose
+ * (transitive) children had experienced an error, even if that (transitive) parent node had been
+ * able to recover from it during a keepGoing build. This behavior can be suppressed by setting
+ * {@link ParallelEvaluatorContext#storeErrorsAlongsideValues} to false, which will cause nodes
+ * with values to have no stored error info. This may be useful if this graph will only ever be
+ * used for keepGoing builds, since in that case storing errors from recovered nodes is pointless.
+ */
+ private void finalizeErrorInfo() {
+ if (errorInfo == null
+ && (evaluatorContext.storeErrorsAlongsideValues() || value == null)
+ && !childErrorInfos.isEmpty()) {
+ errorInfo = ErrorInfo.fromChildErrors(skyKey, childErrorInfos);
+ }
+ }
+
+ void setValue(SkyValue newValue) {
+ Preconditions.checkState(
+ errorInfo == null && bubbleErrorInfo == null,
+ "%s %s %s %s",
+ skyKey,
+ newValue,
+ errorInfo,
+ bubbleErrorInfo);
+ Preconditions.checkState(value == null, "%s %s %s", skyKey, value, newValue);
+ value = newValue;
+ }
+
+ /**
+ * Set this node to be in error. The node's value must not have already been set. However, all
+ * dependencies of this node <i>must</i> already have been registered, since this method may
+ * register a dependence on the error transience node, which should always be the last dep.
+ */
+ void setError(NodeEntry state, ErrorInfo errorInfo, boolean isDirectlyTransient)
+ throws InterruptedException {
+ Preconditions.checkState(value == null, "%s %s %s", skyKey, value, errorInfo);
+ Preconditions.checkState(this.errorInfo == null, "%s %s %s", skyKey, this.errorInfo, errorInfo);
+
+ if (isDirectlyTransient) {
+ NodeEntry errorTransienceNode =
+ Preconditions.checkNotNull(
+ evaluatorContext
+ .getGraph()
+ .get(skyKey, Reason.RDEP_ADDITION, ErrorTransienceValue.KEY),
+ "Null error value? %s",
+ skyKey);
+ DependencyState triState;
+ if (oldDeps.contains(ErrorTransienceValue.KEY)) {
+ triState = errorTransienceNode.checkIfDoneForDirtyReverseDep(skyKey);
+ } else {
+ triState = errorTransienceNode.addReverseDepAndCheckIfDone(skyKey);
+ }
+ Preconditions.checkState(
+ triState == DependencyState.DONE, "%s %s %s", skyKey, triState, errorInfo);
+ state.addTemporaryDirectDeps(
+ GroupedListHelper.create(ImmutableList.of(ErrorTransienceValue.KEY)));
+ state.signalDep();
+ }
+
+ this.errorInfo = Preconditions.checkNotNull(errorInfo, skyKey);
+ }
+
+ private Map<SkyKey, SkyValue> getValuesMaybeFromError(Iterable<SkyKey> keys)
+ throws InterruptedException {
+ // Use a HashMap, not an ImmutableMap.Builder, because we have not yet deduplicated these keys
+ // and ImmutableMap.Builder does not tolerate duplicates. The map will be thrown away
+ // shortly in any case.
+ Map<SkyKey, SkyValue> result = new HashMap<>();
+ ArrayList<SkyKey> missingKeys = new ArrayList<>();
+ for (SkyKey key : keys) {
+ Preconditions.checkState(
+ !key.equals(ErrorTransienceValue.KEY),
+ "Error transience key cannot be in requested deps of %s",
+ skyKey);
+ SkyValue value = maybeGetValueFromErrorOrDeps(key);
+ if (value == null) {
+ missingKeys.add(key);
+ } else {
+ result.put(key, value);
+ }
+ }
+ Map<SkyKey, ? extends NodeEntry> missingEntries =
+ evaluatorContext.getBatchValues(skyKey, Reason.DEP_REQUESTED, missingKeys);
+ for (SkyKey key : missingKeys) {
+ result.put(key, getValueOrNullMarker(missingEntries.get(key)));
+ }
+ return result;
+ }
+
+ /**
+ * Returns just the values of the deps in {@code depKeys}, looking at {@code bubbleErrorInfo},
+ * {@link #directDeps}, and the backing {@link #evaluatorContext#graph} in that order. Any deps
+ * that are not yet done will not have their values present in the returned collection.
+ */
+ private Collection<SkyValue> getDepValuesForDoneNodeMaybeFromError(GroupedList<SkyKey> depKeys)
+ throws InterruptedException {
+ int keySize = depKeys.numElements();
+ List<SkyValue> result = new ArrayList<>(keySize);
+ // depKeys consists of all known deps of this entry. That should include all the keys in
+ // directDeps, and any keys in bubbleErrorInfo. We expect to have to retrieve the keys that
+ // are not in either one.
+ int expectedMissingKeySize =
+ Math.max(
+ keySize - directDeps.size() - (bubbleErrorInfo == null ? 0 : bubbleErrorInfo.size()),
+ 0);
+ ArrayList<SkyKey> missingKeys = new ArrayList<>(expectedMissingKeySize);
+ for (SkyKey key : Iterables.concat(depKeys)) {
+ SkyValue value = maybeGetValueFromErrorOrDeps(key);
+ if (value == null) {
+ missingKeys.add(key);
+ } else {
+ result.add(value);
+ }
+ }
+ for (NodeEntry entry :
+ evaluatorContext.getBatchValues(skyKey, Reason.DEP_REQUESTED, missingKeys).values()) {
+ result.add(getValueOrNullMarker(entry));
+ }
+ return result;
+ }
+
+ @Nullable
+ private SkyValue maybeGetValueFromErrorOrDeps(SkyKey key) throws InterruptedException {
+ return maybeGetValueFromError(key, directDeps.get(key), bubbleErrorInfo);
+ }
+
+ private static SkyValue getValueOrNullMarker(@Nullable NodeEntry nodeEntry)
+ throws InterruptedException {
+ return isDoneForBuild(nodeEntry) ? nodeEntry.getValueMaybeWithMetadata() : NULL_MARKER;
+ }
+
+ @Override
+ protected Map<SkyKey, ValueOrUntypedException> getValueOrUntypedExceptions(
+ Iterable<SkyKey> depKeys) throws InterruptedException {
+ checkActive();
+ Map<SkyKey, SkyValue> values = getValuesMaybeFromError(depKeys);
+ for (Map.Entry<SkyKey, SkyValue> depEntry : values.entrySet()) {
+ SkyKey depKey = depEntry.getKey();
+ SkyValue depValue = depEntry.getValue();
+ if (depValue == NULL_MARKER) {
+ if (directDeps.containsKey(depKey)) {
+ throw new IllegalStateException(
+ "Undone key "
+ + depKey
+ + " was already in deps of "
+ + skyKey
+ + "( dep: "
+ + evaluatorContext.getGraph().get(skyKey, Reason.OTHER, depKey)
+ + ", parent: "
+ + evaluatorContext.getGraph().get(null, Reason.OTHER, skyKey));
+ }
+ valuesMissing = true;
+ addDep(depKey);
+ continue;
+ }
+ ErrorInfo errorInfo = ValueWithMetadata.getMaybeErrorInfo(depEntry.getValue());
+ if (errorInfo != null) {
+ childErrorInfos.add(errorInfo);
+ if (bubbleErrorInfo != null) {
+ // Set interrupted status, to try to prevent the calling SkyFunction from doing anything
+ // fancy after this. SkyFunctions executed during error bubbling are supposed to
+ // (quickly) rethrow errors or return a value/null (but there's currently no way to
+ // enforce this).
+ Thread.currentThread().interrupt();
+ }
+ if ((!evaluatorContext.keepGoing() && bubbleErrorInfo == null)
+ || errorInfo.getException() == null) {
+ valuesMissing = true;
+ // We arbitrarily record the first child error if we are about to abort.
+ if (!evaluatorContext.keepGoing() && depErrorKey == null) {
+ depErrorKey = depKey;
+ }
+ }
+ }
+
+ if (!directDeps.containsKey(depKey)) {
+ if (bubbleErrorInfo == null) {
+ addDep(depKey);
+ }
+ evaluatorContext
+ .getReplayingNestedSetEventVisitor()
+ .visit(ValueWithMetadata.getEvents(depValue));
+ }
+ }
+
+ return Maps.transformValues(
+ values,
+ new Function<SkyValue, ValueOrUntypedException>() {
+ @Override
+ public ValueOrUntypedException apply(SkyValue maybeWrappedValue) {
+ if (maybeWrappedValue == NULL_MARKER) {
+ return ValueOrExceptionUtils.ofNull();
+ }
+ SkyValue justValue = ValueWithMetadata.justValue(maybeWrappedValue);
+ ErrorInfo errorInfo = ValueWithMetadata.getMaybeErrorInfo(maybeWrappedValue);
+
+ if (justValue != null && (evaluatorContext.keepGoing() || errorInfo == null)) {
+ // If the dep did compute a value, it is given to the caller if we are in
+ // keepGoing mode or if we are in noKeepGoingMode and there were no errors computing
+ // it.
+ return ValueOrExceptionUtils.ofValueUntyped(justValue);
+ }
+
+ // There was an error building the value, which we will either report by throwing an
+ // exception or insulate the caller from by returning null.
+ Preconditions.checkNotNull(errorInfo, "%s %s", skyKey, maybeWrappedValue);
+ Exception exception = errorInfo.getException();
+
+ if (!evaluatorContext.keepGoing() && exception != null && bubbleErrorInfo == null) {
+ // Child errors should not be propagated in noKeepGoing mode (except during error
+ // bubbling). Instead we should fail fast.
+ return ValueOrExceptionUtils.ofNull();
+ }
+
+ if (exception != null) {
+ // Give builder a chance to handle this exception.
+ return ValueOrExceptionUtils.ofExn(exception);
+ }
+ // In a cycle.
+ Preconditions.checkState(
+ !Iterables.isEmpty(errorInfo.getCycleInfo()),
+ "%s %s %s",
+ skyKey,
+ errorInfo,
+ maybeWrappedValue);
+ return ValueOrExceptionUtils.ofNull();
+ }
+ });
+ }
+
+ @Override
+ public <
+ E1 extends Exception,
+ E2 extends Exception,
+ E3 extends Exception,
+ E4 extends Exception,
+ E5 extends Exception>
+ Map<SkyKey, ValueOrException5<E1, E2, E3, E4, E5>> getValuesOrThrow(
+ Iterable<SkyKey> depKeys,
+ Class<E1> exceptionClass1,
+ Class<E2> exceptionClass2,
+ Class<E3> exceptionClass3,
+ Class<E4> exceptionClass4,
+ Class<E5> exceptionClass5)
+ throws InterruptedException {
+ newlyRequestedDeps.startGroup();
+ Map<SkyKey, ValueOrException5<E1, E2, E3, E4, E5>> result =
+ super.getValuesOrThrow(
+ depKeys,
+ exceptionClass1,
+ exceptionClass2,
+ exceptionClass3,
+ exceptionClass4,
+ exceptionClass5);
+ newlyRequestedDeps.endGroup();
+ return result;
+ }
+
+ private void addDep(SkyKey key) {
+ if (!newlyRequestedDeps.contains(key)) {
+ // dep may have been requested already this evaluation. If not, add it.
+ newlyRequestedDeps.add(key);
+ }
+ }
+
+ /**
+ * If {@code !keepGoing} and there is at least one dep in error, returns a dep in error. Otherwise
+ * returns {@code null}.
+ */
+ @Nullable
+ SkyKey getDepErrorKey() {
+ return depErrorKey;
+ }
+
+ @Override
+ public EventHandler getListener() {
+ checkActive();
+ return eventHandler;
+ }
+
+ void doneBuilding() {
+ building = false;
+ }
+
+ GroupedListHelper<SkyKey> getNewlyRequestedDeps() {
+ return newlyRequestedDeps;
+ }
+
+ Collection<NodeEntry> getDirectDepsValues() {
+ return directDeps.values();
+ }
+
+ Collection<ErrorInfo> getChildErrorInfos() {
+ return childErrorInfos;
+ }
+
+ /**
+ * Apply the change to the graph (mostly) atomically and signal all nodes that are waiting for
+ * this node to complete. Adding nodes and signaling is not atomic, but may need to be changed for
+ * interruptibility.
+ *
+ * <p>Parents are only enqueued if {@code enqueueParents} holds. Parents should be enqueued unless
+ * (1) this node is being built after the main evaluation has aborted, or (2) this node is being
+ * built with --nokeep_going, and so we are about to shut down the main evaluation anyway.
+ *
+ * <p>The node entry is informed if the node's value and error are definitive via the flag {@code
+ * completeValue}.
+ */
+ void commit(NodeEntry primaryEntry, boolean enqueueParents) throws InterruptedException {
+ // Construct the definitive error info, if there is one.
+ finalizeErrorInfo();
+
+ // We have the following implications:
+ // errorInfo == null => value != null => enqueueParents.
+ // All these implications are strict:
+ // (1) errorInfo != null && value != null happens for values with recoverable errors.
+ // (2) value == null && enqueueParents happens for values that are found to have errors
+ // during a --keep_going build.
+
+ NestedSet<TaggedEvents> events = buildEvents(primaryEntry, /*missingChildren=*/ false);
+ Version valueVersion;
+ SkyValue valueWithMetadata;
+ if (value == null) {
+ Preconditions.checkNotNull(errorInfo, "%s %s", skyKey, primaryEntry);
+ valueWithMetadata = ValueWithMetadata.error(errorInfo, events);
+ } else {
+ // We must be enqueueing parents if we have a value.
+ Preconditions.checkState(enqueueParents, "%s %s", skyKey, primaryEntry);
+ valueWithMetadata = ValueWithMetadata.normal(value, errorInfo, events);
+ }
+ if (!oldDeps.isEmpty()) {
+ // Remove the rdep on this entry for each of its old deps that is no longer a direct dep.
+ Set<SkyKey> depsToRemove =
+ Sets.difference(oldDeps, primaryEntry.getTemporaryDirectDeps().toSet());
+ Collection<? extends NodeEntry> oldDepEntries =
+ evaluatorContext.getGraph().getBatch(skyKey, Reason.RDEP_REMOVAL, depsToRemove).values();
+ for (NodeEntry oldDepEntry : oldDepEntries) {
+ oldDepEntry.removeReverseDep(skyKey);
+ }
+ }
+ // If this entry is dirty, setValue may not actually change it, if it determines that
+ // the data being written now is the same as the data already present in the entry.
+ // We could consider using max(childVersions) here instead of graphVersion. When full
+ // versioning is implemented, this would allow evaluation at a version between
+ // max(childVersions) and graphVersion to re-use this result.
+ Set<SkyKey> reverseDeps =
+ primaryEntry.setValue(valueWithMetadata, evaluatorContext.getGraphVersion());
+ // Note that if this update didn't actually change the value entry, this version may not
+ // be the graph version.
+ valueVersion = primaryEntry.getVersion();
+ Preconditions.checkState(
+ valueVersion.atMost(evaluatorContext.getGraphVersion()),
+ "%s should be at most %s in the version partial ordering",
+ valueVersion,
+ evaluatorContext.getGraphVersion());
+ if (evaluatorContext.getProgressReceiver() != null) {
+ // Tell the receiver that this value was built. If valueVersion.equals(graphVersion), it
+ // was evaluated this run, and so was changed. Otherwise, it is less than graphVersion,
+ // by the Preconditions check above, and was not actually changed this run -- when it was
+ // written above, its version stayed below this update's version, so its value remains the
+ // same as before.
+ // We use a SkyValueSupplier here because it keeps a reference to the entry, allowing for
+ // the receiver to be confident that the entry is readily accessible in memory.
+ evaluatorContext
+ .getProgressReceiver()
+ .evaluated(
+ skyKey,
+ new SkyValueSupplier(primaryEntry),
+ valueVersion.equals(evaluatorContext.getGraphVersion())
+ ? EvaluationState.BUILT
+ : EvaluationState.CLEAN);
+ }
+ evaluatorContext.signalValuesAndEnqueueIfReady(
+ skyKey, reverseDeps, valueVersion, enqueueParents);
+
+ evaluatorContext.getVisitor().notifyDone(skyKey);
+ evaluatorContext.getReplayingNestedSetEventVisitor().visit(events);
+ }
+
+ @Nullable
+ private String getTagFromKey() {
+ return evaluatorContext.getSkyFunctions().get(skyKey.functionName()).extractTag(skyKey);
+ }
+
+ /**
+ * Gets the latch that is counted down when an exception is thrown in {@code
+ * AbstractQueueVisitor}. For use in tests to check if an exception actually was thrown. Calling
+ * {@code AbstractQueueVisitor#awaitExceptionForTestingOnly} can throw a spurious {@link
+ * InterruptedException} because {@link CountDownLatch#await} checks the interrupted bit before
+ * returning, even if the latch is already at 0. See bug "testTwoErrors is flaky".
+ */
+ CountDownLatch getExceptionLatchForTesting() {
+ return evaluatorContext.getVisitor().getExceptionLatchForTestingOnly();
+ }
+
+ @Override
+ public boolean inErrorBubblingForTesting() {
+ return bubbleErrorInfo != null;
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/skyframe/SkyValueSupplier.java b/src/main/java/com/google/devtools/build/skyframe/SkyValueSupplier.java
new file mode 100644
index 0000000000..d05dbd729f
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/skyframe/SkyValueSupplier.java
@@ -0,0 +1,42 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.skyframe;
+
+import com.google.common.base.Supplier;
+
+/**
+ * Supplier of {@link SkyValue} that crashes if its contained {@link NodeEntry} throws an {@link
+ * InterruptedException} on value retrieval.
+ */
+class SkyValueSupplier implements Supplier<SkyValue> {
+ private final NodeEntry state;
+
+ SkyValueSupplier(NodeEntry state) {
+ this.state = state;
+ }
+
+ @Override
+ public SkyValue get() {
+ try {
+ return state.getValue();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "Graph implementations in which value retrieval can block should not be used in "
+ + "frameworks that use the value in EvaluationProgressReceiver, since that could "
+ + "result in significant slowdowns: "
+ + state,
+ e);
+ }
+ }
+}
diff --git a/src/test/java/com/google/devtools/build/skyframe/ChainedFunction.java b/src/test/java/com/google/devtools/build/skyframe/ChainedFunction.java
index fd2f44b362..a2976d2348 100644
--- a/src/test/java/com/google/devtools/build/skyframe/ChainedFunction.java
+++ b/src/test/java/com/google/devtools/build/skyframe/ChainedFunction.java
@@ -16,11 +16,8 @@ package com.google.devtools.build.skyframe;
import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.skyframe.GraphTester.ValueComputer;
-import com.google.devtools.build.skyframe.ParallelEvaluator.SkyFunctionEnvironment;
import com.google.devtools.build.skyframe.SkyFunctionException.Transience;
-
import java.util.concurrent.CountDownLatch;
-
import javax.annotation.Nullable;
/**
diff --git a/src/test/java/com/google/devtools/build/skyframe/MemoizingEvaluatorTest.java b/src/test/java/com/google/devtools/build/skyframe/MemoizingEvaluatorTest.java
index 0afcb62c5c..90b67816e1 100644
--- a/src/test/java/com/google/devtools/build/skyframe/MemoizingEvaluatorTest.java
+++ b/src/test/java/com/google/devtools/build/skyframe/MemoizingEvaluatorTest.java
@@ -3977,7 +3977,7 @@ public class MemoizingEvaluatorTest {
public SkyValue compute(SkyKey skyKey, Environment env) throws InterruptedException {
shutdownAwaiterStarted.countDown();
TrackingAwaiter.INSTANCE.awaitLatchAndTrackExceptions(
- ((ParallelEvaluator.SkyFunctionEnvironment) env).getExceptionLatchForTesting(),
+ ((SkyFunctionEnvironment) env).getExceptionLatchForTesting(),
"exception not thrown");
// Threadpool is shutting down. Don't try to synchronize anything in the future
// during error bubbling.
diff --git a/src/test/java/com/google/devtools/build/skyframe/ParallelEvaluatorTest.java b/src/test/java/com/google/devtools/build/skyframe/ParallelEvaluatorTest.java
index b9f97815df..a901a29200 100644
--- a/src/test/java/com/google/devtools/build/skyframe/ParallelEvaluatorTest.java
+++ b/src/test/java/com/google/devtools/build/skyframe/ParallelEvaluatorTest.java
@@ -46,7 +46,6 @@ import com.google.devtools.build.skyframe.GraphTester.StringValue;
import com.google.devtools.build.skyframe.NotifyingHelper.EventType;
import com.google.devtools.build.skyframe.NotifyingHelper.Listener;
import com.google.devtools.build.skyframe.NotifyingHelper.Order;
-import com.google.devtools.build.skyframe.ParallelEvaluator.EventFilter;
import com.google.devtools.build.skyframe.SkyFunctionException.Transience;
import java.util.ArrayList;
import java.util.Arrays;