diff options
author | Mark Schaller <mschaller@google.com> | 2015-11-02 17:30:32 +0000 |
---|---|---|
committer | David Chen <dzc@google.com> | 2015-11-02 23:18:59 +0000 |
commit | fa2443ce72a3172eefb07925215370416dd4f340 (patch) | |
tree | 68a5e7d688ad5852b54b7c59f7e30574571cc92c /src/main/java/com/google/devtools/build/lib/concurrent | |
parent | 6231d08672f1eceda34455d66b6aaffd18d73e60 (diff) |
Fix AbstractQueueVisitor synchronization, comments, and field names
This fixes the following synchronization issue with
AbstractQueueVisitor's jobsMustBeStoppedField: it was read in
awaitTermination in a block synchronized on zeroRemainingTasks, but
in markToStopAllJobsIfNeeded it was read and written in a block
synchronized on the AQV instance. Now, it is always read or written
in a block synchronized on zeroRemainingTasks, because it is used in
the condition represented by that object.
This also thoroughly cleans up obsolete and irregular documentation in
the class.
--
MOS_MIGRATED_REVID=106849236
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/concurrent')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java | 274 |
1 files changed, 129 insertions, 145 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java index b082bf20a9..5412221977 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java @@ -31,10 +31,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; -/** - * AbstractQueueVisitor is a {@link QuiescingExecutor} implementation that wraps an {@link - * ExecutorService}. - */ +import javax.annotation.Nullable; + +/** A {@link QuiescingExecutor} implementation that wraps an {@link ExecutorService}. */ public class AbstractQueueVisitor implements QuiescingExecutor { /** @@ -63,13 +62,12 @@ public class AbstractQueueVisitor implements QuiescingExecutor { }; /** - * The first unhandled exception thrown by a worker thread. We save it - * and re-throw it from the main thread to detect bugs faster; - * otherwise worker threads just quietly die. + * The first unhandled exception thrown by a worker thread. We save it and re-throw it from + * the main thread to detect bugs faster; otherwise worker threads just quietly die. * - * Field updates are synchronized; it's - * important to save the first one as it may be more informative than a - * subsequent one, and this is not a performance-critical path. + * Field updates happen only in blocks that are synchronized on the {@link + * AbstractQueueVisitor} object; it's important to save the first one as it may be more + * informative than a subsequent one, and this is not a performance-critical path. */ private volatile Throwable unhandled = null; @@ -82,15 +80,14 @@ public class AbstractQueueVisitor implements QuiescingExecutor { private volatile Throwable catastrophe; /** - * Enables concurrency. For debugging or testing, set this to false - * to avoid thread creation and concurrency. Any deviation in observed - * behaviour is a bug. + * Enables concurrency. For debugging or testing, set this to {@code false} to avoid thread + * creation and concurrency. Any deviation in observed behaviour is a bug. */ private final boolean concurrent; /** * An object used in the manner of a {@link java.util.concurrent.locks.Condition} object, for the - * condition {@code remainingTasks.get() == 0}. + * condition {@code remainingTasks.get() == 0 || jobsMustBeStopped}. * TODO(bazel-team): Replace with an actual {@link java.util.concurrent.locks.Condition} object. */ private final Object zeroRemainingTasks = new Object(); @@ -101,72 +98,63 @@ public class AbstractQueueVisitor implements QuiescingExecutor { */ private final AtomicLong remainingTasks = new AtomicLong(0); - // Map of thread ==> number of jobs executing in the thread. - // Currently used only for interrupt handling. - private final Map<Thread, Long> jobs = Maps.newConcurrentMap(); - /** - * The {@link ExecutorService}. If !{@code concurrent}, always {@code null}. Created lazily on - * first call to {@link #execute(Runnable)}, and removed after call to {@link #awaitQuiescence}. + * Flag used to record when all threads were killed by failed action execution. Only ever + * transitions from {@code false} to {@code true}. + * + * <p>May only be accessed in a block that is synchronized on {@link #zeroRemainingTasks}. */ - private final ExecutorService pool; + private boolean jobsMustBeStopped = false; + + /** Map from thread to number of jobs executing in the thread. Used for interrupt handling. */ + private final Map<Thread, Long> jobs = Maps.newConcurrentMap(); + + /** The {@link ExecutorService}. If !{@code concurrent}, this may be {@code null}. */ + @Nullable private final ExecutorService executorService; /** - * Flag used to record when the main thread (the thread which called - * {@link #awaitQuiescence(boolean)}) is interrupted. + * Flag used to record when the main thread (the thread which called {@link #awaitQuiescence}) + * is interrupted. * - * When this is true, adding tasks to the thread pool will - * fail quietly as a part of the process of shutting down the - * worker threads. + * When this is {@code true}, adding tasks to the {@link ExecutorService} will fail quietly as + * a part of the process of shutting down the worker threads. */ private volatile boolean threadInterrupted = false; /** - * Latches used to signal when the visitor has been interrupted or - * seen an exception. Used only for testing. + * Latches used to signal when the visitor has been interrupted or seen an exception. Used only + * for testing. */ private final CountDownLatch interruptedLatch = new CountDownLatch(1); private final CountDownLatch exceptionLatch = new CountDownLatch(1); - /** - * If true, don't run new actions after an uncaught exception. - */ + /** If {@code true}, don't run new actions after an uncaught exception. */ private final boolean failFastOnException; - /** - * If true, don't run new actions after an interrupt. - */ + /** If {@code true}, don't run new actions after an interrupt. */ private final boolean failFastOnInterrupt; - /** If true, we must shut down the {@link ExecutorService} on completion. */ + /** If {@code true}, shut down the {@link ExecutorService} on completion. */ private final boolean ownExecutorService; - /** - * Flag used to record when all threads were killed by failed action execution. - * - * <p>May only be accessed in a synchronized block. - */ - private boolean jobsMustBeStopped = false; - private final ErrorClassifier errorClassifier; private static final Logger LOG = Logger.getLogger(AbstractQueueVisitor.class.getName()); /** - * Create the AbstractQueueVisitor. + * Create the {@link AbstractQueueVisitor}. * - * @param concurrent true if concurrency should be enabled. Only set to - * false for debugging. + * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false} for + * debugging. * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. - * @param keepAliveTime the keep-alive time for the thread pool. + * @param keepAliveTime the keep-alive time for the {@link ExecutorService}, if applicable. * @param units the time units of keepAliveTime. - * @param failFastOnException if true, don't run new actions after - * an uncaught exception. - * @param failFastOnInterrupt if true, don't run new actions after interrupt. - * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default - * thread naming will be used. + * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception. + * @param failFastOnInterrupt if {@code true}, don't run new actions after an interrupt. + * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code + * null}, default thread naming will be used. */ public AbstractQueueVisitor( boolean concurrent, @@ -189,20 +177,19 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } /** - * Create the AbstractQueueVisitor. + * Create the {@link AbstractQueueVisitor}. * - * @param concurrent true if concurrency should be enabled. Only set to - * false for debugging. + * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false} for + * debugging. * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. - * @param keepAliveTime the keep-alive time for the thread pool. + * @param keepAliveTime the keep-alive time for the {@link ExecutorService}, if applicable. * @param units the time units of keepAliveTime. - * @param failFastOnException if true, don't run new actions after - * an uncaught exception. - * @param failFastOnInterrupt if true, don't run new actions after interrupt. - * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default - * thread naming will be used. + * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception. + * @param failFastOnInterrupt if {@code true}, don't run new actions after an interrupt. + * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code + * null}, default thread naming will be used. * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs. */ public AbstractQueueVisitor( @@ -227,21 +214,21 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } /** - * Create the AbstractQueueVisitor. + * Create the {@link AbstractQueueVisitor}. * - * @param concurrent true if concurrency should be enabled. Only set to - * false for debugging. + * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false} for + * debugging. * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. - * @param keepAliveTime the keep-alive time for the thread pool. + * @param keepAliveTime the keep-alive time for the {@link ExecutorService}, if applicable. * @param units the time units of keepAliveTime. - * @param failFastOnException if true, don't run new actions after an uncaught exception. - * @param failFastOnInterrupt if true, don't run new actions after interrupt. - * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default - * thread naming will be used. + * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception. + * @param failFastOnInterrupt if {@code true}, don't run new actions after interrupt. + * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code + * null}, default thread naming will be used. * @param executorFactory the factory for constructing the executor service if {@code concurrent} - * is true. + * is {@code true}. */ public AbstractQueueVisitor( boolean concurrent, @@ -260,7 +247,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { this.failFastOnException = failFastOnException; this.failFastOnInterrupt = failFastOnInterrupt; this.ownExecutorService = true; - this.pool = + this.executorService = concurrent ? executorFactory.apply( new ExecutorParams( @@ -270,19 +257,18 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } /** - * Create the AbstractQueueVisitor. + * Create the {@link AbstractQueueVisitor}. * - * @param concurrent true if concurrency should be enabled. Only set to - * false for debugging. + * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false} + * for debugging. * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. - * @param keepAliveTime the keep-alive time for the thread pool. + * @param keepAliveTime the keep-alive time for the {@link ExecutorService}, if applicable. * @param units the time units of keepAliveTime. - * @param failFastOnException if true, don't run new actions after - * an uncaught exception. - * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default - * thread naming will be used. + * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception. + * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code + * null}, default thread naming will be used. */ public AbstractQueueVisitor( boolean concurrent, @@ -304,25 +290,24 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } /** - * Create the AbstractQueueVisitor. + * Create the {@link AbstractQueueVisitor}. * - * @param executor The ThreadPool to use. - * @param shutdownOnCompletion If true, pass ownership of the Threadpool to - * this class. The pool will be shut down after a - * call to work(). Callers must not shutdown the - * threadpool while queue visitors use it. - * @param failFastOnException if true, don't run new actions after - * an uncaught exception. - * @param failFastOnInterrupt if true, don't run new actions after interrupt. + * @param executorService The {@link ExecutorService} to use. + * @param shutdownOnCompletion If {@code true}, pass ownership of the {@link ExecutorService} to + * this class. The service will be shut down after a + * call to {@link #awaitQuiescence}. Callers must not shutdown the + * {@link ExecutorService} while queue visitors use it. + * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception. + * @param failFastOnInterrupt if {@code true}, don't run new actions after an interrupt. */ public AbstractQueueVisitor( - ThreadPoolExecutor executor, + ExecutorService executorService, boolean shutdownOnCompletion, boolean failFastOnException, boolean failFastOnInterrupt) { this( /*concurrent=*/ true, - executor, + executorService, shutdownOnCompletion, failFastOnException, failFastOnInterrupt, @@ -330,71 +315,73 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } /** - * Create the AbstractQueueVisitor. + * Create the {@link AbstractQueueVisitor}. * - * @param concurrent if false, run tasks inline instead of using the thread pool. - * @param executor The ThreadPool to use. - * @param shutdownOnCompletion If true, pass ownership of the Threadpool to - * this class. The pool will be shut down after a - * call to work(). Callers must not shut down the - * threadpool while queue visitors use it. - * @param failFastOnException if true, don't run new actions after - * an uncaught exception. - * @param failFastOnInterrupt if true, don't run new actions after interrupt. + * @param concurrent if {@code false}, run tasks inline instead of using the {@link + * ExecutorService}. + * @param executorService The {@link ExecutorService} to use. + * @param shutdownOnCompletion If {@code true}, pass ownership of the {@link ExecutorService} to + * this class. The service will be shut down after a + * call to {@link #awaitQuiescence}. Callers must not shut down the + * {@link ExecutorService} while queue visitors use it. + * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception. + * @param failFastOnInterrupt if {@code true}, don't run new actions after an interrupt. */ public AbstractQueueVisitor( boolean concurrent, - ThreadPoolExecutor executor, + ExecutorService executorService, boolean shutdownOnCompletion, boolean failFastOnException, boolean failFastOnInterrupt) { + Preconditions.checkArgument(executorService != null || !concurrent); this.concurrent = concurrent; this.failFastOnException = failFastOnException; this.failFastOnInterrupt = failFastOnInterrupt; this.ownExecutorService = shutdownOnCompletion; - this.pool = executor; + this.executorService = executorService; this.errorClassifier = ErrorClassifier.DEFAULT; } /** * Create the AbstractQueueVisitor. * - * @param concurrent if false, run tasks inline instead of using the thread pool. - * @param executor The ThreadPool to use. - * @param shutdownOnCompletion If true, pass ownership of the Threadpool to - * this class. The pool will be shut down after a - * call to work(). Callers must not shut down the - * threadpool while queue visitors use it. - * @param failFastOnException if true, don't run new actions after - * an uncaught exception. - * @param failFastOnInterrupt if true, don't run new actions after interrupt. + * @param concurrent if {@code false}, run tasks inline instead of using the {@link + * ExecutorService}. + * @param executorService The {@link ExecutorService} to use. + * @param shutdownOnCompletion If {@code true}, pass ownership of the {@link ExecutorService} to + * this class. The service will be shut down after a + * call to {@link #awaitQuiescence}. Callers must not shut down the + * {@link ExecutorService} while queue visitors use it. + * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception. + * @param failFastOnInterrupt if {@code true}, don't run new actions after an interrupt. * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs. */ public AbstractQueueVisitor( boolean concurrent, - ThreadPoolExecutor executor, + ExecutorService executorService, boolean shutdownOnCompletion, boolean failFastOnException, boolean failFastOnInterrupt, ErrorClassifier errorClassifier) { + Preconditions.checkArgument(executorService != null || !concurrent); this.concurrent = concurrent; this.failFastOnException = failFastOnException; this.failFastOnInterrupt = failFastOnInterrupt; this.ownExecutorService = shutdownOnCompletion; - this.pool = executor; + this.executorService = executorService; this.errorClassifier = errorClassifier; } /** - * Create the AbstractQueueVisitor with concurrency enabled. + * Create the {@code AbstractQueueVisitor} with concurrency enabled. * * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. - * @param keepAlive the keep-alive time for the thread pool. + * @param keepAlive the keep-alive time for the {@link ExecutorService}, if applicable. * @param units the time units of keepAliveTime. - * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default - * thread naming will be used. + * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code + * null}, default thread naming will be used. */ public AbstractQueueVisitor(int parallelism, long keepAlive, TimeUnit units, String poolName) { this( @@ -421,10 +408,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } } - /** - * Schedules a call. - * Called in a worker thread if concurrent. - */ + /** Schedules a call. Called in a worker thread if concurrent. */ @Override public final void execute(Runnable runnable) { if (concurrent) { @@ -438,7 +422,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { tasks > 0, "Incrementing remaining tasks counter resulted in impossible non-positive number %s", tasks); - pool.execute(wrapRunnable(runnable, ranTask)); + executeRunnable(wrapRunnable(runnable, ranTask)); } catch (Throwable e) { if (!ranTask.get()) { // Note that keeping track of ranTask is necessary to disambiguate the case where @@ -453,6 +437,10 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } } + protected void executeRunnable(Runnable runnable) { + executorService.execute(runnable); + } + private void recordError(Throwable e) { try { // If threadInterrupted is true, then RejectedExecutionExceptions are expected. There's no @@ -545,9 +533,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } } - /** - * Set an internal flag to show that an interrupt was detected. - */ + /** Set an internal flag to show that an interrupt was detected. */ private void setInterrupted() { threadInterrupted = true; } @@ -567,36 +553,30 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } } - /** - * If this returns true, don't enqueue new actions. - */ + /** If this returns true, don't enqueue new actions. */ protected boolean blockNewActions() { return (failFastOnInterrupt && isInterrupted()) || (unhandled != null && failFastOnException); } - /** Get latch that is released when exception is received by visitor. Used only in tests. */ @VisibleForTesting - public CountDownLatch getExceptionLatchForTestingOnly() { + public final CountDownLatch getExceptionLatchForTestingOnly() { return exceptionLatch; } - /** Get latch that is released when interruption is received by visitor. Used only in tests. */ @VisibleForTesting - public CountDownLatch getInterruptionLatchForTestingOnly() { + public final CountDownLatch getInterruptionLatchForTestingOnly() { return interruptedLatch; } - /** - * Get the value of the interrupted flag. - */ + /** Get the value of the interrupted flag. */ @ThreadSafety.ThreadSafe protected final boolean isInterrupted() { return threadInterrupted; } /** - * Get number of jobs remaining. Note that this can increase in value - * if running tasks submit further jobs. + * Get number of jobs remaining. Note that this can increase in value if running tasks submit + * further jobs. */ @VisibleForTesting protected final long getTaskCount() { @@ -604,7 +584,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } /** - * Waits for the task queue to drain, then shuts down the thread pool and + * Waits for the task queue to drain, then shuts down the {@link ExecutorService} and * waits for it to terminate. Throws (the same) unchecked exception if any * worker thread failed unexpectedly. */ @@ -666,11 +646,11 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } if (ownExecutorService) { - pool.shutdown(); + executorService.shutdown(); for (;;) { try { Throwables.propagateIfPossible(catastrophe); - pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); + executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); break; } catch (InterruptedException e) { setInterrupted(); @@ -689,10 +669,14 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } /** - * If exception is critical then set a flag which signals - * to stop all jobs inside {@link #awaitTermination(boolean)}. + * Classifies a {@link Throwable} {@param e} thrown by a job. + * + * <p>If it is classified as critical, then this sets the {@link #jobsMustBeStopped} flag to + * {@code true} which signals {@link #awaitTermination(boolean)} to stop all jobs. + * + * <p>Also logs details about {@param e} if it is classified as something that must be logged. */ - private synchronized void markToStopAllJobsIfNeeded(Throwable e) { + private void markToStopAllJobsIfNeeded(Throwable e) { boolean critical = false; switch (errorClassifier.classify(e)) { case CRITICAL_AND_LOG: @@ -705,9 +689,9 @@ public class AbstractQueueVisitor implements QuiescingExecutor { default: break; } - if (critical && !jobsMustBeStopped) { - jobsMustBeStopped = true; - synchronized (zeroRemainingTasks) { + synchronized (zeroRemainingTasks) { + if (critical && !jobsMustBeStopped) { + jobsMustBeStopped = true; zeroRemainingTasks.notify(); } } |