aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools
diff options
context:
space:
mode:
authorGravatar Mark Schaller <mschaller@google.com>2015-10-30 22:37:04 +0000
committerGravatar Florian Weikert <fwe@google.com>2015-11-02 16:55:33 +0000
commit1aa49da5e5f9e98ce3a2a0b96f387f2990a2363f (patch)
treeffca8b78242da3ccfc99959ea890c0cbdbf3c8ae /src/main/java/com/google/devtools
parent5350b3154665ebb37e10e6261686eb646ea23220 (diff)
Introduce ErrorClassifier
Changes the AbstractQueueVisitor strategy for varying its response to unhandled exceptions from inheritance to composition. This will help with a forthcoming switch from inheritance to delegation for ValueVisitor's use of AbstractQueueVisitor. -- MOS_MIGRATED_REVID=106730708
Diffstat (limited to 'src/main/java/com/google/devtools')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java126
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/ErrorClassifier.java59
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java34
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java39
4 files changed, 193 insertions, 65 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index 9f49798bde..d5b6f1a7e9 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -148,6 +148,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
*/
private boolean jobsMustBeStopped = false;
+ private final ErrorClassifier errorClassifier;
+
private static final Logger LOG = Logger.getLogger(AbstractQueueVisitor.class.getName());
/**
@@ -182,7 +184,46 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
failFastOnException,
failFastOnInterrupt,
poolName,
- EXECUTOR_FACTORY);
+ EXECUTOR_FACTORY,
+ ErrorClassifier.DEFAULT);
+ }
+
+ /**
+ * Create the AbstractQueueVisitor.
+ *
+ * @param concurrent true if concurrency should be enabled. Only set to
+ * false for debugging.
+ * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
+ * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
+ * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
+ * @param keepAliveTime the keep-alive time for the thread pool.
+ * @param units the time units of keepAliveTime.
+ * @param failFastOnException if true, don't run new actions after
+ * an uncaught exception.
+ * @param failFastOnInterrupt if true, don't run new actions after interrupt.
+ * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
+ * thread naming will be used.
+ * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs.
+ */
+ public AbstractQueueVisitor(
+ boolean concurrent,
+ int parallelism,
+ long keepAliveTime,
+ TimeUnit units,
+ boolean failFastOnException,
+ boolean failFastOnInterrupt,
+ String poolName,
+ ErrorClassifier errorClassifier) {
+ this(
+ concurrent,
+ parallelism,
+ keepAliveTime,
+ units,
+ failFastOnException,
+ failFastOnInterrupt,
+ poolName,
+ EXECUTOR_FACTORY,
+ errorClassifier);
}
/**
@@ -210,9 +251,11 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
boolean failFastOnException,
boolean failFastOnInterrupt,
String poolName,
- Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
+ Function<ExecutorParams, ? extends ExecutorService> executorFactory,
+ ErrorClassifier errorClassifier) {
Preconditions.checkNotNull(poolName);
Preconditions.checkNotNull(executorFactory);
+ Preconditions.checkNotNull(errorClassifier);
this.concurrent = concurrent;
this.failFastOnException = failFastOnException;
this.failFastOnInterrupt = failFastOnInterrupt;
@@ -223,6 +266,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
new ExecutorParams(
parallelism, keepAliveTime, units, poolName, new BlockingStack<Runnable>()))
: null;
+ this.errorClassifier = errorClassifier;
}
/**
@@ -255,7 +299,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
failFastOnException,
true,
poolName,
- EXECUTOR_FACTORY);
+ EXECUTOR_FACTORY,
+ ErrorClassifier.DEFAULT);
}
/**
@@ -280,7 +325,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
executor,
shutdownOnCompletion,
failFastOnException,
- failFastOnInterrupt);
+ failFastOnInterrupt,
+ ErrorClassifier.DEFAULT);
}
/**
@@ -307,6 +353,36 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
this.failFastOnInterrupt = failFastOnInterrupt;
this.ownExecutorService = shutdownOnCompletion;
this.pool = executor;
+ this.errorClassifier = ErrorClassifier.DEFAULT;
+ }
+
+ /**
+ * Create the AbstractQueueVisitor.
+ *
+ * @param concurrent if false, run tasks inline instead of using the thread pool.
+ * @param executor The ThreadPool to use.
+ * @param shutdownOnCompletion If true, pass ownership of the Threadpool to
+ * this class. The pool will be shut down after a
+ * call to work(). Callers must not shut down the
+ * threadpool while queue visitors use it.
+ * @param failFastOnException if true, don't run new actions after
+ * an uncaught exception.
+ * @param failFastOnInterrupt if true, don't run new actions after interrupt.
+ * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs.
+ */
+ public AbstractQueueVisitor(
+ boolean concurrent,
+ ThreadPoolExecutor executor,
+ boolean shutdownOnCompletion,
+ boolean failFastOnException,
+ boolean failFastOnInterrupt,
+ ErrorClassifier errorClassifier) {
+ this.concurrent = concurrent;
+ this.failFastOnException = failFastOnException;
+ this.failFastOnInterrupt = failFastOnInterrupt;
+ this.ownExecutorService = shutdownOnCompletion;
+ this.pool = executor;
+ this.errorClassifier = errorClassifier;
}
/**
@@ -321,7 +397,16 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* thread naming will be used.
*/
public AbstractQueueVisitor(int parallelism, long keepAlive, TimeUnit units, String poolName) {
- this(true, parallelism, keepAlive, units, false, true, poolName, EXECUTOR_FACTORY);
+ this(
+ true,
+ parallelism,
+ keepAlive,
+ units,
+ false,
+ true,
+ poolName,
+ EXECUTOR_FACTORY,
+ ErrorClassifier.DEFAULT);
}
@@ -612,42 +697,13 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
}
- /** Classification of an error thrown by an action. */
- protected enum ErrorClassification {
- // All running actions should be stopped.
- CRITICAL,
- // Same as CRITICAL, but also log the error.
- CRITICAL_AND_LOG,
- // Other running actions should be left alone.
- NOT_CRITICAL
- }
-
- /**
- * Classifies {@code e}. {@link Error}s are always classified as {@code CRITICAL_AND_LOG}.
- *
- * <p>Default value - always treat errors as {@code NOT_CRITICAL}. If different behavior is needed
- * then we should override this method in subclasses.
- *
- * @param e the exception object to check
- */
- protected ErrorClassification classifyError(Throwable e) {
- return ErrorClassification.NOT_CRITICAL;
- }
-
- private ErrorClassification classifyErrorInternal(Throwable e) {
- if (e instanceof Error) {
- return ErrorClassification.CRITICAL_AND_LOG;
- }
- return classifyError(e);
- }
-
/**
* If exception is critical then set a flag which signals
* to stop all jobs inside {@link #awaitTermination(boolean)}.
*/
private synchronized void markToStopAllJobsIfNeeded(Throwable e) {
boolean critical = false;
- switch (classifyErrorInternal(e)) {
+ switch (errorClassifier.classify(e)) {
case CRITICAL_AND_LOG:
critical = true;
LOG.log(Level.WARNING, "Found critical error in queue visitor", e);
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ErrorClassifier.java b/src/main/java/com/google/devtools/build/lib/concurrent/ErrorClassifier.java
new file mode 100644
index 0000000000..250a7adcea
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/ErrorClassifier.java
@@ -0,0 +1,59 @@
+// Copyright 2015 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.concurrent;
+
+import com.google.common.base.Preconditions;
+
+/** A classifier for {@link Error}s and {@link Exception}s. */
+public abstract class ErrorClassifier {
+
+ /** Classification of an error thrown by an action. */
+ protected enum ErrorClassification {
+ /** All running actions should be stopped.*/
+ CRITICAL,
+ /** Same as CRITICAL, but also log the error.*/
+ CRITICAL_AND_LOG,
+ /** Other running actions should be left alone.*/
+ NOT_CRITICAL
+ }
+
+ /** Always treat exceptions as {@code NOT_CRITICAL}. */
+ public static final ErrorClassifier DEFAULT =
+ new ErrorClassifier() {
+ @Override
+ protected ErrorClassification classifyException(Exception e) {
+ return ErrorClassification.NOT_CRITICAL;
+ }
+ };
+
+ /**
+ * Used by {@link #classify} to classify {@link Exception}s. (Note that {@link Error}s
+ * are always classified as {@code CRITICAL_AND_LOG}.)
+ *
+ * @param e the exception object to check
+ */
+ protected abstract ErrorClassification classifyException(Exception e);
+
+ /**
+ * Classify {@param e}. If {@code e} is an {@link Error}, it will be classified as {@code
+ * CRITICAL_AND_LOG}. Otherwise, calls {@link #classifyException}.
+ */
+ public final ErrorClassification classify(Throwable e) {
+ if (e instanceof Error) {
+ return ErrorClassification.CRITICAL_AND_LOG;
+ }
+ Preconditions.checkArgument(e instanceof Exception, e);
+ return classifyException((Exception) e);
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
index 2d18f910fa..3c2c399bd5 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
+import com.google.devtools.build.lib.concurrent.ErrorClassifier;
import com.google.devtools.build.lib.concurrent.ExecutorParams;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.util.Pair;
@@ -61,6 +62,16 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr
private static final boolean MUST_EXIST = true;
+ private static final ErrorClassifier errorClassifier =
+ new ErrorClassifier() {
+ @Override
+ protected ErrorClassification classifyException(Exception e) {
+ return e instanceof RuntimeException
+ ? ErrorClassification.CRITICAL_AND_LOG
+ : ErrorClassification.NOT_CRITICAL;
+ }
+ };
+
protected final TGraph graph;
@Nullable protected final EvaluationProgressReceiver invalidationReceiver;
protected final DirtyKeyTracker dirtyKeyTracker;
@@ -81,14 +92,16 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr
InvalidationState state,
DirtyKeyTracker dirtyKeyTracker,
Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
- super(/*concurrent=*/true,
- /*parallelism=*/DEFAULT_THREAD_COUNT,
- /*keepAliveTime=*/1,
- /*units=*/TimeUnit.SECONDS,
- /*failFastOnException=*/true,
- /*failFastOnInterrupt=*/true,
+ super(
+ /*concurrent=*/ true,
+ /*parallelism=*/ DEFAULT_THREAD_COUNT,
+ /*keepAliveTime=*/ 1,
+ /*units=*/ TimeUnit.SECONDS,
+ /*failFastOnException=*/ true,
+ /*failFastOnInterrupt=*/ true,
"skyframe-invalidator",
- executorFactory);
+ executorFactory,
+ errorClassifier);
this.graph = Preconditions.checkNotNull(graph);
this.invalidationReceiver = invalidationReceiver;
this.dirtyKeyTracker = Preconditions.checkNotNull(dirtyKeyTracker);
@@ -113,13 +126,6 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr
"All dirty nodes should have been processed: %s", pendingVisitations);
}
- @Override
- protected ErrorClassification classifyError(Throwable e) {
- return e instanceof RuntimeException
- ? ErrorClassification.CRITICAL_AND_LOG
- : ErrorClassification.NOT_CRITICAL;
- }
-
protected abstract long count();
protected void informInvalidationReceiver(SkyKey key,
diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
index 36231f7241..4908fd629d 100644
--- a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
@@ -30,6 +30,7 @@ import com.google.devtools.build.lib.collect.nestedset.NestedSet;
import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder;
import com.google.devtools.build.lib.collect.nestedset.NestedSetVisitor;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
+import com.google.devtools.build.lib.concurrent.ErrorClassifier;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
@@ -595,29 +596,35 @@ public final class ParallelEvaluator implements Evaluator {
}
}
+ private static final ErrorClassifier VALUE_VISITOR_ERROR_CLASSIFIER =
+ new ErrorClassifier() {
+ @Override
+ protected ErrorClassification classifyException(Exception e) {
+ if (e instanceof SchedulerException) {
+ return ErrorClassification.CRITICAL;
+ }
+ if (e instanceof RuntimeException) {
+ return ErrorClassification.CRITICAL_AND_LOG;
+ }
+ return ErrorClassification.NOT_CRITICAL;
+ }
+ };
+
private class ValueVisitor extends AbstractQueueVisitor {
private AtomicBoolean preventNewEvaluations = new AtomicBoolean(false);
private final Set<SkyKey> inflightNodes = Sets.newConcurrentHashSet();
private final Set<RuntimeException> crashes = Sets.newConcurrentHashSet();
private ValueVisitor(int threadCount) {
- super(/*concurrent*/true,
+ super(
+ /*concurrent*/ true,
threadCount,
- 1, TimeUnit.SECONDS,
- /*failFastOnException*/true,
- /*failFastOnInterrupt*/true,
- "skyframe-evaluator");
- }
-
- @Override
- protected ErrorClassification classifyError(Throwable e) {
- if (e instanceof SchedulerException) {
- return ErrorClassification.CRITICAL;
- }
- if (e instanceof RuntimeException) {
- return ErrorClassification.CRITICAL_AND_LOG;
- }
- return ErrorClassification.NOT_CRITICAL;
+ 1,
+ TimeUnit.SECONDS,
+ /*failFastOnException*/ true,
+ /*failFastOnInterrupt*/ true,
+ "skyframe-evaluator",
+ VALUE_VISITOR_ERROR_CLASSIFIER);
}
protected void waitForCompletion() throws InterruptedException {