aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
diff options
context:
space:
mode:
authorGravatar Janak Ramakrishnan <janakr@google.com>2016-08-25 22:42:37 +0000
committerGravatar John Cater <jcater@google.com>2016-08-26 18:39:02 +0000
commitb449e3fa913a99c76b22f3301d1ec29c12b9e2f9 (patch)
tree96816087422822279870642a4bc64111524a2f17 /src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
parent7576acc4702962af9143e49a6e0da89490449ca2 (diff)
Refactor ParallelEvaluator in preparation for making it more modular with respect to cycle checking.
Reducing the size of ParallelEvaluator.java is also probably long overdue. I believe this change stands on its own, but if you don't think the third change is worth it, and this isn't worth it on its own, feel free to push back. -- MOS_MIGRATED_REVID=131340165
Diffstat (limited to 'src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java')
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java154
1 files changed, 154 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
new file mode 100644
index 0000000000..6d0dbfa6ff
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
@@ -0,0 +1,154 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.skyframe;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Sets;
+import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
+import com.google.devtools.build.lib.concurrent.ErrorClassifier;
+import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor;
+import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Threadpool manager for {@link ParallelEvaluator}. Wraps a {@link QuiescingExecutor} and keeps
+ * track of pending nodes.
+ */
+class NodeEntryVisitor {
+ private static final ErrorClassifier NODE_ENTRY_VISITOR_ERROR_CLASSIFIER =
+ new ErrorClassifier() {
+ @Override
+ protected ErrorClassification classifyException(Exception e) {
+ if (e instanceof SchedulerException) {
+ return ErrorClassification.CRITICAL;
+ }
+ if (e instanceof RuntimeException) {
+ // We treat non-SchedulerException RuntimeExceptions as more severe than
+ // SchedulerExceptions so that AbstractQueueVisitor will propagate instances of the
+ // former. They indicate actual Blaze bugs, rather than normal Skyframe evaluation
+ // control flow.
+ return ErrorClassification.CRITICAL_AND_LOG;
+ }
+ return ErrorClassification.NOT_CRITICAL;
+ }
+ };
+
+ private final QuiescingExecutor quiescingExecutor;
+ private final AtomicBoolean preventNewEvaluations = new AtomicBoolean(false);
+ private final Set<SkyKey> inflightNodes = Sets.newConcurrentHashSet();
+ private final Set<RuntimeException> crashes = Sets.newConcurrentHashSet();
+ private final DirtyKeyTracker dirtyKeyTracker;
+ private final EvaluationProgressReceiver progressReceiver;
+ /**
+ * Function that allows this visitor to execute the appropriate {@link Runnable} when given a
+ * {@link SkyKey} to evaluate.
+ */
+ private final Function<SkyKey, Runnable> runnableMaker;
+
+ NodeEntryVisitor(
+ ForkJoinPool forkJoinPool,
+ DirtyKeyTracker dirtyKeyTracker,
+ EvaluationProgressReceiver progressReceiver,
+ Function<SkyKey, Runnable> runnableMaker) {
+ quiescingExecutor =
+ new ForkJoinQuiescingExecutor(forkJoinPool, NODE_ENTRY_VISITOR_ERROR_CLASSIFIER);
+ this.dirtyKeyTracker = dirtyKeyTracker;
+ this.progressReceiver = progressReceiver;
+ this.runnableMaker = runnableMaker;
+ }
+
+ NodeEntryVisitor(
+ int threadCount,
+ DirtyKeyTracker dirtyKeyTracker,
+ EvaluationProgressReceiver progressReceiver,
+ Function<SkyKey, Runnable> runnableMaker) {
+ quiescingExecutor =
+ new AbstractQueueVisitor(
+ /*concurrent*/ true,
+ threadCount,
+ /*keepAliveTime=*/ 1,
+ TimeUnit.SECONDS,
+ /*failFastOnException*/ true,
+ "skyframe-evaluator",
+ NODE_ENTRY_VISITOR_ERROR_CLASSIFIER);
+ this.dirtyKeyTracker = dirtyKeyTracker;
+ this.progressReceiver = progressReceiver;
+ this.runnableMaker = runnableMaker;
+ }
+
+ void waitForCompletion() throws InterruptedException {
+ quiescingExecutor.awaitQuiescence(/*interruptWorkers=*/ true);
+ }
+
+ void enqueueEvaluation(SkyKey key) {
+ // We unconditionally add the key to the set of in-flight nodes because even if evaluation is
+ // never scheduled we still want to remove the previously created NodeEntry from the graph.
+ // Otherwise we would leave the graph in a weird state (wasteful garbage in the best case and
+ // inconsistent in the worst case).
+ boolean newlyEnqueued = inflightNodes.add(key);
+ // All nodes enqueued for evaluation will be either verified clean, re-evaluated, or cleaned
+ // up after being in-flight when an error happens in nokeep_going mode or in the event of an
+ // interrupt. In any of these cases, they won't be dirty anymore.
+ if (newlyEnqueued) {
+ dirtyKeyTracker.notDirty(key);
+ }
+ if (preventNewEvaluations.get()) {
+ return;
+ }
+ if (newlyEnqueued && progressReceiver != null) {
+ progressReceiver.enqueueing(key);
+ }
+ quiescingExecutor.execute(runnableMaker.apply(key));
+ }
+
+ /**
+ * Stop any new evaluations from being enqueued. Returns whether this was the first thread to
+ * request a halt. If true, this thread should proceed to throw an exception. If false, another
+ * thread already requested a halt and will throw an exception, and so this thread can simply end.
+ */
+ boolean preventNewEvaluations() {
+ return preventNewEvaluations.compareAndSet(false, true);
+ }
+
+ void noteCrash(RuntimeException e) {
+ crashes.add(e);
+ }
+
+ Collection<RuntimeException> getCrashes() {
+ return crashes;
+ }
+
+ void notifyDone(SkyKey key) {
+ inflightNodes.remove(key);
+ }
+
+ boolean isInflight(SkyKey key) {
+ return inflightNodes.contains(key);
+ }
+
+ Set<SkyKey> getInflightNodes() {
+ return inflightNodes;
+ }
+
+ @VisibleForTesting
+ CountDownLatch getExceptionLatchForTestingOnly() {
+ return quiescingExecutor.getExceptionLatchForTestingOnly();
+ }
+}