aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/concurrent
diff options
context:
space:
mode:
authorGravatar Mark Schaller <mschaller@google.com>2015-11-02 17:30:32 +0000
committerGravatar David Chen <dzc@google.com>2015-11-02 23:18:59 +0000
commitfa2443ce72a3172eefb07925215370416dd4f340 (patch)
tree68a5e7d688ad5852b54b7c59f7e30574571cc92c /src/main/java/com/google/devtools/build/lib/concurrent
parent6231d08672f1eceda34455d66b6aaffd18d73e60 (diff)
Fix AbstractQueueVisitor synchronization, comments, and field names
This fixes the following synchronization issue with AbstractQueueVisitor's jobsMustBeStoppedField: it was read in awaitTermination in a block synchronized on zeroRemainingTasks, but in markToStopAllJobsIfNeeded it was read and written in a block synchronized on the AQV instance. Now, it is always read or written in a block synchronized on zeroRemainingTasks, because it is used in the condition represented by that object. This also thoroughly cleans up obsolete and irregular documentation in the class. -- MOS_MIGRATED_REVID=106849236
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/concurrent')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java274
1 files changed, 129 insertions, 145 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index b082bf20a9..5412221977 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -31,10 +31,9 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
-/**
- * AbstractQueueVisitor is a {@link QuiescingExecutor} implementation that wraps an {@link
- * ExecutorService}.
- */
+import javax.annotation.Nullable;
+
+/** A {@link QuiescingExecutor} implementation that wraps an {@link ExecutorService}. */
public class AbstractQueueVisitor implements QuiescingExecutor {
/**
@@ -63,13 +62,12 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
};
/**
- * The first unhandled exception thrown by a worker thread. We save it
- * and re-throw it from the main thread to detect bugs faster;
- * otherwise worker threads just quietly die.
+ * The first unhandled exception thrown by a worker thread. We save it and re-throw it from
+ * the main thread to detect bugs faster; otherwise worker threads just quietly die.
*
- * Field updates are synchronized; it's
- * important to save the first one as it may be more informative than a
- * subsequent one, and this is not a performance-critical path.
+ * Field updates happen only in blocks that are synchronized on the {@link
+ * AbstractQueueVisitor} object; it's important to save the first one as it may be more
+ * informative than a subsequent one, and this is not a performance-critical path.
*/
private volatile Throwable unhandled = null;
@@ -82,15 +80,14 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
private volatile Throwable catastrophe;
/**
- * Enables concurrency. For debugging or testing, set this to false
- * to avoid thread creation and concurrency. Any deviation in observed
- * behaviour is a bug.
+ * Enables concurrency. For debugging or testing, set this to {@code false} to avoid thread
+ * creation and concurrency. Any deviation in observed behaviour is a bug.
*/
private final boolean concurrent;
/**
* An object used in the manner of a {@link java.util.concurrent.locks.Condition} object, for the
- * condition {@code remainingTasks.get() == 0}.
+ * condition {@code remainingTasks.get() == 0 || jobsMustBeStopped}.
* TODO(bazel-team): Replace with an actual {@link java.util.concurrent.locks.Condition} object.
*/
private final Object zeroRemainingTasks = new Object();
@@ -101,72 +98,63 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
*/
private final AtomicLong remainingTasks = new AtomicLong(0);
- // Map of thread ==> number of jobs executing in the thread.
- // Currently used only for interrupt handling.
- private final Map<Thread, Long> jobs = Maps.newConcurrentMap();
-
/**
- * The {@link ExecutorService}. If !{@code concurrent}, always {@code null}. Created lazily on
- * first call to {@link #execute(Runnable)}, and removed after call to {@link #awaitQuiescence}.
+ * Flag used to record when all threads were killed by failed action execution. Only ever
+ * transitions from {@code false} to {@code true}.
+ *
+ * <p>May only be accessed in a block that is synchronized on {@link #zeroRemainingTasks}.
*/
- private final ExecutorService pool;
+ private boolean jobsMustBeStopped = false;
+
+ /** Map from thread to number of jobs executing in the thread. Used for interrupt handling. */
+ private final Map<Thread, Long> jobs = Maps.newConcurrentMap();
+
+ /** The {@link ExecutorService}. If !{@code concurrent}, this may be {@code null}. */
+ @Nullable private final ExecutorService executorService;
/**
- * Flag used to record when the main thread (the thread which called
- * {@link #awaitQuiescence(boolean)}) is interrupted.
+ * Flag used to record when the main thread (the thread which called {@link #awaitQuiescence})
+ * is interrupted.
*
- * When this is true, adding tasks to the thread pool will
- * fail quietly as a part of the process of shutting down the
- * worker threads.
+ * When this is {@code true}, adding tasks to the {@link ExecutorService} will fail quietly as
+ * a part of the process of shutting down the worker threads.
*/
private volatile boolean threadInterrupted = false;
/**
- * Latches used to signal when the visitor has been interrupted or
- * seen an exception. Used only for testing.
+ * Latches used to signal when the visitor has been interrupted or seen an exception. Used only
+ * for testing.
*/
private final CountDownLatch interruptedLatch = new CountDownLatch(1);
private final CountDownLatch exceptionLatch = new CountDownLatch(1);
- /**
- * If true, don't run new actions after an uncaught exception.
- */
+ /** If {@code true}, don't run new actions after an uncaught exception. */
private final boolean failFastOnException;
- /**
- * If true, don't run new actions after an interrupt.
- */
+ /** If {@code true}, don't run new actions after an interrupt. */
private final boolean failFastOnInterrupt;
- /** If true, we must shut down the {@link ExecutorService} on completion. */
+ /** If {@code true}, shut down the {@link ExecutorService} on completion. */
private final boolean ownExecutorService;
- /**
- * Flag used to record when all threads were killed by failed action execution.
- *
- * <p>May only be accessed in a synchronized block.
- */
- private boolean jobsMustBeStopped = false;
-
private final ErrorClassifier errorClassifier;
private static final Logger LOG = Logger.getLogger(AbstractQueueVisitor.class.getName());
/**
- * Create the AbstractQueueVisitor.
+ * Create the {@link AbstractQueueVisitor}.
*
- * @param concurrent true if concurrency should be enabled. Only set to
- * false for debugging.
+ * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false} for
+ * debugging.
* @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
* parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
* corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
- * @param keepAliveTime the keep-alive time for the thread pool.
+ * @param keepAliveTime the keep-alive time for the {@link ExecutorService}, if applicable.
* @param units the time units of keepAliveTime.
- * @param failFastOnException if true, don't run new actions after
- * an uncaught exception.
- * @param failFastOnInterrupt if true, don't run new actions after interrupt.
- * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
- * thread naming will be used.
+ * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
+ * @param failFastOnInterrupt if {@code true}, don't run new actions after an interrupt.
+ * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code
+ * null}, default thread naming will be used.
*/
public AbstractQueueVisitor(
boolean concurrent,
@@ -189,20 +177,19 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
/**
- * Create the AbstractQueueVisitor.
+ * Create the {@link AbstractQueueVisitor}.
*
- * @param concurrent true if concurrency should be enabled. Only set to
- * false for debugging.
+ * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false} for
+ * debugging.
* @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
* parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
* corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
- * @param keepAliveTime the keep-alive time for the thread pool.
+ * @param keepAliveTime the keep-alive time for the {@link ExecutorService}, if applicable.
* @param units the time units of keepAliveTime.
- * @param failFastOnException if true, don't run new actions after
- * an uncaught exception.
- * @param failFastOnInterrupt if true, don't run new actions after interrupt.
- * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
- * thread naming will be used.
+ * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
+ * @param failFastOnInterrupt if {@code true}, don't run new actions after an interrupt.
+ * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code
+ * null}, default thread naming will be used.
* @param errorClassifier an error classifier used to determine whether to log and/or stop jobs.
*/
public AbstractQueueVisitor(
@@ -227,21 +214,21 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
/**
- * Create the AbstractQueueVisitor.
+ * Create the {@link AbstractQueueVisitor}.
*
- * @param concurrent true if concurrency should be enabled. Only set to
- * false for debugging.
+ * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false} for
+ * debugging.
* @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
* parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
* corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
- * @param keepAliveTime the keep-alive time for the thread pool.
+ * @param keepAliveTime the keep-alive time for the {@link ExecutorService}, if applicable.
* @param units the time units of keepAliveTime.
- * @param failFastOnException if true, don't run new actions after an uncaught exception.
- * @param failFastOnInterrupt if true, don't run new actions after interrupt.
- * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
- * thread naming will be used.
+ * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
+ * @param failFastOnInterrupt if {@code true}, don't run new actions after interrupt.
+ * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code
+ * null}, default thread naming will be used.
* @param executorFactory the factory for constructing the executor service if {@code concurrent}
- * is true.
+ * is {@code true}.
*/
public AbstractQueueVisitor(
boolean concurrent,
@@ -260,7 +247,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
this.failFastOnException = failFastOnException;
this.failFastOnInterrupt = failFastOnInterrupt;
this.ownExecutorService = true;
- this.pool =
+ this.executorService =
concurrent
? executorFactory.apply(
new ExecutorParams(
@@ -270,19 +257,18 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
/**
- * Create the AbstractQueueVisitor.
+ * Create the {@link AbstractQueueVisitor}.
*
- * @param concurrent true if concurrency should be enabled. Only set to
- * false for debugging.
+ * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false}
+ * for debugging.
* @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
* parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
* corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
- * @param keepAliveTime the keep-alive time for the thread pool.
+ * @param keepAliveTime the keep-alive time for the {@link ExecutorService}, if applicable.
* @param units the time units of keepAliveTime.
- * @param failFastOnException if true, don't run new actions after
- * an uncaught exception.
- * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
- * thread naming will be used.
+ * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
+ * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code
+ * null}, default thread naming will be used.
*/
public AbstractQueueVisitor(
boolean concurrent,
@@ -304,25 +290,24 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
/**
- * Create the AbstractQueueVisitor.
+ * Create the {@link AbstractQueueVisitor}.
*
- * @param executor The ThreadPool to use.
- * @param shutdownOnCompletion If true, pass ownership of the Threadpool to
- * this class. The pool will be shut down after a
- * call to work(). Callers must not shutdown the
- * threadpool while queue visitors use it.
- * @param failFastOnException if true, don't run new actions after
- * an uncaught exception.
- * @param failFastOnInterrupt if true, don't run new actions after interrupt.
+ * @param executorService The {@link ExecutorService} to use.
+ * @param shutdownOnCompletion If {@code true}, pass ownership of the {@link ExecutorService} to
+ * this class. The service will be shut down after a
+ * call to {@link #awaitQuiescence}. Callers must not shutdown the
+ * {@link ExecutorService} while queue visitors use it.
+ * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
+ * @param failFastOnInterrupt if {@code true}, don't run new actions after an interrupt.
*/
public AbstractQueueVisitor(
- ThreadPoolExecutor executor,
+ ExecutorService executorService,
boolean shutdownOnCompletion,
boolean failFastOnException,
boolean failFastOnInterrupt) {
this(
/*concurrent=*/ true,
- executor,
+ executorService,
shutdownOnCompletion,
failFastOnException,
failFastOnInterrupt,
@@ -330,71 +315,73 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
/**
- * Create the AbstractQueueVisitor.
+ * Create the {@link AbstractQueueVisitor}.
*
- * @param concurrent if false, run tasks inline instead of using the thread pool.
- * @param executor The ThreadPool to use.
- * @param shutdownOnCompletion If true, pass ownership of the Threadpool to
- * this class. The pool will be shut down after a
- * call to work(). Callers must not shut down the
- * threadpool while queue visitors use it.
- * @param failFastOnException if true, don't run new actions after
- * an uncaught exception.
- * @param failFastOnInterrupt if true, don't run new actions after interrupt.
+ * @param concurrent if {@code false}, run tasks inline instead of using the {@link
+ * ExecutorService}.
+ * @param executorService The {@link ExecutorService} to use.
+ * @param shutdownOnCompletion If {@code true}, pass ownership of the {@link ExecutorService} to
+ * this class. The service will be shut down after a
+ * call to {@link #awaitQuiescence}. Callers must not shut down the
+ * {@link ExecutorService} while queue visitors use it.
+ * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
+ * @param failFastOnInterrupt if {@code true}, don't run new actions after an interrupt.
*/
public AbstractQueueVisitor(
boolean concurrent,
- ThreadPoolExecutor executor,
+ ExecutorService executorService,
boolean shutdownOnCompletion,
boolean failFastOnException,
boolean failFastOnInterrupt) {
+ Preconditions.checkArgument(executorService != null || !concurrent);
this.concurrent = concurrent;
this.failFastOnException = failFastOnException;
this.failFastOnInterrupt = failFastOnInterrupt;
this.ownExecutorService = shutdownOnCompletion;
- this.pool = executor;
+ this.executorService = executorService;
this.errorClassifier = ErrorClassifier.DEFAULT;
}
/**
* Create the AbstractQueueVisitor.
*
- * @param concurrent if false, run tasks inline instead of using the thread pool.
- * @param executor The ThreadPool to use.
- * @param shutdownOnCompletion If true, pass ownership of the Threadpool to
- * this class. The pool will be shut down after a
- * call to work(). Callers must not shut down the
- * threadpool while queue visitors use it.
- * @param failFastOnException if true, don't run new actions after
- * an uncaught exception.
- * @param failFastOnInterrupt if true, don't run new actions after interrupt.
+ * @param concurrent if {@code false}, run tasks inline instead of using the {@link
+ * ExecutorService}.
+ * @param executorService The {@link ExecutorService} to use.
+ * @param shutdownOnCompletion If {@code true}, pass ownership of the {@link ExecutorService} to
+ * this class. The service will be shut down after a
+ * call to {@link #awaitQuiescence}. Callers must not shut down the
+ * {@link ExecutorService} while queue visitors use it.
+ * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
+ * @param failFastOnInterrupt if {@code true}, don't run new actions after an interrupt.
* @param errorClassifier an error classifier used to determine whether to log and/or stop jobs.
*/
public AbstractQueueVisitor(
boolean concurrent,
- ThreadPoolExecutor executor,
+ ExecutorService executorService,
boolean shutdownOnCompletion,
boolean failFastOnException,
boolean failFastOnInterrupt,
ErrorClassifier errorClassifier) {
+ Preconditions.checkArgument(executorService != null || !concurrent);
this.concurrent = concurrent;
this.failFastOnException = failFastOnException;
this.failFastOnInterrupt = failFastOnInterrupt;
this.ownExecutorService = shutdownOnCompletion;
- this.pool = executor;
+ this.executorService = executorService;
this.errorClassifier = errorClassifier;
}
/**
- * Create the AbstractQueueVisitor with concurrency enabled.
+ * Create the {@code AbstractQueueVisitor} with concurrency enabled.
*
* @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
* parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
* corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
- * @param keepAlive the keep-alive time for the thread pool.
+ * @param keepAlive the keep-alive time for the {@link ExecutorService}, if applicable.
* @param units the time units of keepAliveTime.
- * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
- * thread naming will be used.
+ * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code
+ * null}, default thread naming will be used.
*/
public AbstractQueueVisitor(int parallelism, long keepAlive, TimeUnit units, String poolName) {
this(
@@ -421,10 +408,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
}
- /**
- * Schedules a call.
- * Called in a worker thread if concurrent.
- */
+ /** Schedules a call. Called in a worker thread if concurrent. */
@Override
public final void execute(Runnable runnable) {
if (concurrent) {
@@ -438,7 +422,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
tasks > 0,
"Incrementing remaining tasks counter resulted in impossible non-positive number %s",
tasks);
- pool.execute(wrapRunnable(runnable, ranTask));
+ executeRunnable(wrapRunnable(runnable, ranTask));
} catch (Throwable e) {
if (!ranTask.get()) {
// Note that keeping track of ranTask is necessary to disambiguate the case where
@@ -453,6 +437,10 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
}
+ protected void executeRunnable(Runnable runnable) {
+ executorService.execute(runnable);
+ }
+
private void recordError(Throwable e) {
try {
// If threadInterrupted is true, then RejectedExecutionExceptions are expected. There's no
@@ -545,9 +533,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
}
- /**
- * Set an internal flag to show that an interrupt was detected.
- */
+ /** Set an internal flag to show that an interrupt was detected. */
private void setInterrupted() {
threadInterrupted = true;
}
@@ -567,36 +553,30 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
}
- /**
- * If this returns true, don't enqueue new actions.
- */
+ /** If this returns true, don't enqueue new actions. */
protected boolean blockNewActions() {
return (failFastOnInterrupt && isInterrupted()) || (unhandled != null && failFastOnException);
}
- /** Get latch that is released when exception is received by visitor. Used only in tests. */
@VisibleForTesting
- public CountDownLatch getExceptionLatchForTestingOnly() {
+ public final CountDownLatch getExceptionLatchForTestingOnly() {
return exceptionLatch;
}
- /** Get latch that is released when interruption is received by visitor. Used only in tests. */
@VisibleForTesting
- public CountDownLatch getInterruptionLatchForTestingOnly() {
+ public final CountDownLatch getInterruptionLatchForTestingOnly() {
return interruptedLatch;
}
- /**
- * Get the value of the interrupted flag.
- */
+ /** Get the value of the interrupted flag. */
@ThreadSafety.ThreadSafe
protected final boolean isInterrupted() {
return threadInterrupted;
}
/**
- * Get number of jobs remaining. Note that this can increase in value
- * if running tasks submit further jobs.
+ * Get number of jobs remaining. Note that this can increase in value if running tasks submit
+ * further jobs.
*/
@VisibleForTesting
protected final long getTaskCount() {
@@ -604,7 +584,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
/**
- * Waits for the task queue to drain, then shuts down the thread pool and
+ * Waits for the task queue to drain, then shuts down the {@link ExecutorService} and
* waits for it to terminate. Throws (the same) unchecked exception if any
* worker thread failed unexpectedly.
*/
@@ -666,11 +646,11 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
if (ownExecutorService) {
- pool.shutdown();
+ executorService.shutdown();
for (;;) {
try {
Throwables.propagateIfPossible(catastrophe);
- pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
+ executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
break;
} catch (InterruptedException e) {
setInterrupted();
@@ -689,10 +669,14 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
/**
- * If exception is critical then set a flag which signals
- * to stop all jobs inside {@link #awaitTermination(boolean)}.
+ * Classifies a {@link Throwable} {@param e} thrown by a job.
+ *
+ * <p>If it is classified as critical, then this sets the {@link #jobsMustBeStopped} flag to
+ * {@code true} which signals {@link #awaitTermination(boolean)} to stop all jobs.
+ *
+ * <p>Also logs details about {@param e} if it is classified as something that must be logged.
*/
- private synchronized void markToStopAllJobsIfNeeded(Throwable e) {
+ private void markToStopAllJobsIfNeeded(Throwable e) {
boolean critical = false;
switch (errorClassifier.classify(e)) {
case CRITICAL_AND_LOG:
@@ -705,9 +689,9 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
default:
break;
}
- if (critical && !jobsMustBeStopped) {
- jobsMustBeStopped = true;
- synchronized (zeroRemainingTasks) {
+ synchronized (zeroRemainingTasks) {
+ if (critical && !jobsMustBeStopped) {
+ jobsMustBeStopped = true;
zeroRemainingTasks.notify();
}
}