aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
diff options
context:
space:
mode:
authorGravatar Mark Schaller <mschaller@google.com>2015-10-30 22:37:04 +0000
committerGravatar Florian Weikert <fwe@google.com>2015-11-02 16:55:33 +0000
commit1aa49da5e5f9e98ce3a2a0b96f387f2990a2363f (patch)
treeffca8b78242da3ccfc99959ea890c0cbdbf3c8ae /src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
parent5350b3154665ebb37e10e6261686eb646ea23220 (diff)
Introduce ErrorClassifier
Changes the AbstractQueueVisitor strategy for varying its response to unhandled exceptions from inheritance to composition. This will help with a forthcoming switch from inheritance to delegation for ValueVisitor's use of AbstractQueueVisitor. -- MOS_MIGRATED_REVID=106730708
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java126
1 files changed, 91 insertions, 35 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index 9f49798bde..d5b6f1a7e9 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -148,6 +148,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
*/
private boolean jobsMustBeStopped = false;
+ private final ErrorClassifier errorClassifier;
+
private static final Logger LOG = Logger.getLogger(AbstractQueueVisitor.class.getName());
/**
@@ -182,7 +184,46 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
failFastOnException,
failFastOnInterrupt,
poolName,
- EXECUTOR_FACTORY);
+ EXECUTOR_FACTORY,
+ ErrorClassifier.DEFAULT);
+ }
+
+ /**
+ * Create the AbstractQueueVisitor.
+ *
+ * @param concurrent true if concurrency should be enabled. Only set to
+ * false for debugging.
+ * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
+ * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
+ * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
+ * @param keepAliveTime the keep-alive time for the thread pool.
+ * @param units the time units of keepAliveTime.
+ * @param failFastOnException if true, don't run new actions after
+ * an uncaught exception.
+ * @param failFastOnInterrupt if true, don't run new actions after interrupt.
+ * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
+ * thread naming will be used.
+ * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs.
+ */
+ public AbstractQueueVisitor(
+ boolean concurrent,
+ int parallelism,
+ long keepAliveTime,
+ TimeUnit units,
+ boolean failFastOnException,
+ boolean failFastOnInterrupt,
+ String poolName,
+ ErrorClassifier errorClassifier) {
+ this(
+ concurrent,
+ parallelism,
+ keepAliveTime,
+ units,
+ failFastOnException,
+ failFastOnInterrupt,
+ poolName,
+ EXECUTOR_FACTORY,
+ errorClassifier);
}
/**
@@ -210,9 +251,11 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
boolean failFastOnException,
boolean failFastOnInterrupt,
String poolName,
- Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
+ Function<ExecutorParams, ? extends ExecutorService> executorFactory,
+ ErrorClassifier errorClassifier) {
Preconditions.checkNotNull(poolName);
Preconditions.checkNotNull(executorFactory);
+ Preconditions.checkNotNull(errorClassifier);
this.concurrent = concurrent;
this.failFastOnException = failFastOnException;
this.failFastOnInterrupt = failFastOnInterrupt;
@@ -223,6 +266,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
new ExecutorParams(
parallelism, keepAliveTime, units, poolName, new BlockingStack<Runnable>()))
: null;
+ this.errorClassifier = errorClassifier;
}
/**
@@ -255,7 +299,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
failFastOnException,
true,
poolName,
- EXECUTOR_FACTORY);
+ EXECUTOR_FACTORY,
+ ErrorClassifier.DEFAULT);
}
/**
@@ -280,7 +325,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
executor,
shutdownOnCompletion,
failFastOnException,
- failFastOnInterrupt);
+ failFastOnInterrupt,
+ ErrorClassifier.DEFAULT);
}
/**
@@ -307,6 +353,36 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
this.failFastOnInterrupt = failFastOnInterrupt;
this.ownExecutorService = shutdownOnCompletion;
this.pool = executor;
+ this.errorClassifier = ErrorClassifier.DEFAULT;
+ }
+
+ /**
+ * Create the AbstractQueueVisitor.
+ *
+ * @param concurrent if false, run tasks inline instead of using the thread pool.
+ * @param executor The ThreadPool to use.
+ * @param shutdownOnCompletion If true, pass ownership of the Threadpool to
+ * this class. The pool will be shut down after a
+ * call to work(). Callers must not shut down the
+ * threadpool while queue visitors use it.
+ * @param failFastOnException if true, don't run new actions after
+ * an uncaught exception.
+ * @param failFastOnInterrupt if true, don't run new actions after interrupt.
+ * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs.
+ */
+ public AbstractQueueVisitor(
+ boolean concurrent,
+ ThreadPoolExecutor executor,
+ boolean shutdownOnCompletion,
+ boolean failFastOnException,
+ boolean failFastOnInterrupt,
+ ErrorClassifier errorClassifier) {
+ this.concurrent = concurrent;
+ this.failFastOnException = failFastOnException;
+ this.failFastOnInterrupt = failFastOnInterrupt;
+ this.ownExecutorService = shutdownOnCompletion;
+ this.pool = executor;
+ this.errorClassifier = errorClassifier;
}
/**
@@ -321,7 +397,16 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* thread naming will be used.
*/
public AbstractQueueVisitor(int parallelism, long keepAlive, TimeUnit units, String poolName) {
- this(true, parallelism, keepAlive, units, false, true, poolName, EXECUTOR_FACTORY);
+ this(
+ true,
+ parallelism,
+ keepAlive,
+ units,
+ false,
+ true,
+ poolName,
+ EXECUTOR_FACTORY,
+ ErrorClassifier.DEFAULT);
}
@@ -612,42 +697,13 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
}
- /** Classification of an error thrown by an action. */
- protected enum ErrorClassification {
- // All running actions should be stopped.
- CRITICAL,
- // Same as CRITICAL, but also log the error.
- CRITICAL_AND_LOG,
- // Other running actions should be left alone.
- NOT_CRITICAL
- }
-
- /**
- * Classifies {@code e}. {@link Error}s are always classified as {@code CRITICAL_AND_LOG}.
- *
- * <p>Default value - always treat errors as {@code NOT_CRITICAL}. If different behavior is needed
- * then we should override this method in subclasses.
- *
- * @param e the exception object to check
- */
- protected ErrorClassification classifyError(Throwable e) {
- return ErrorClassification.NOT_CRITICAL;
- }
-
- private ErrorClassification classifyErrorInternal(Throwable e) {
- if (e instanceof Error) {
- return ErrorClassification.CRITICAL_AND_LOG;
- }
- return classifyError(e);
- }
-
/**
* If exception is critical then set a flag which signals
* to stop all jobs inside {@link #awaitTermination(boolean)}.
*/
private synchronized void markToStopAllJobsIfNeeded(Throwable e) {
boolean critical = false;
- switch (classifyErrorInternal(e)) {
+ switch (errorClassifier.classify(e)) {
case CRITICAL_AND_LOG:
critical = true;
LOG.log(Level.WARNING, "Found critical error in queue visitor", e);