diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java | 158 |
1 files changed, 82 insertions, 76 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java index 87f85e3aa6..08903acd2c 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java @@ -24,8 +24,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -33,9 +34,9 @@ import java.util.logging.Level; import java.util.logging.Logger; /** - * AbstractQueueVisitor is a wrapper around {@link ThreadPoolExecutor} which - * delays thread pool shutdown until entire visitation is complete. - * This is useful for cases in which worker tasks may submit additional tasks. + * AbstractQueueVisitor is a wrapper around {@link ExecutorService} which delays service shutdown + * until entire visitation is complete. This is useful for cases in which worker tasks may submit + * additional tasks. * * <p>Consider the following example: * <pre> @@ -56,14 +57,26 @@ import java.util.logging.Logger; public class AbstractQueueVisitor { /** - * Default factory function for constructing {@link ThreadPoolExecutor}s. + * Default factory function for constructing {@link ThreadPoolExecutor}s. The {@link + * ThreadPoolExecutor}s this creates have the same value for {@code corePoolSize} and {@code + * maximumPoolSize} because that results in a fixed-size thread pool, and the current use cases + * for {@link AbstractQueueVisitor} don't require any more sophisticated thread pool size + * management. + * + * <p>If client use cases change, they may invoke one of the {@link + * AbstractQueueVisitor#AbstractQueueVisitor} constructors that accepts a pre-constructed {@link + * ThreadPoolExecutor}. */ - public static final Function<ThreadPoolExecutorParams, ThreadPoolExecutor> EXECUTOR_FACTORY = - new Function<ThreadPoolExecutorParams, ThreadPoolExecutor>() { + public static final Function<ExecutorParams, ThreadPoolExecutor> EXECUTOR_FACTORY = + new Function<ExecutorParams, ThreadPoolExecutor>() { @Override - public ThreadPoolExecutor apply(ThreadPoolExecutorParams p) { - return new ThreadPoolExecutor(p.getCorePoolSize(), p.getMaxPoolSize(), - p.getKeepAliveTime(), p.getUnits(), p.getWorkQueue(), + public ThreadPoolExecutor apply(ExecutorParams p) { + return new ThreadPoolExecutor( + /*corePoolSize=*/ p.getParallelism(), + /*maximumPoolSize=*/ p.getParallelism(), + p.getKeepAliveTime(), + p.getUnits(), + p.getWorkQueue(), new ThreadFactoryBuilder().setNameFormat(p.getPoolName() + " %d").build()); } }; @@ -80,10 +93,10 @@ public class AbstractQueueVisitor { private volatile Throwable unhandled = null; /** - * An uncaught exception when submitting a job to the ThreadPool is catastrophic, and usually - * indicates a lack of stack space on which to allocate a native thread. The JDK - * ThreadPoolExecutor may reach an inconsistent state in such circumstances, so we avoid blocking - * on its termination when this field is non-null. + * An uncaught exception when submitting a job to the {@link ExecutorService} is catastrophic, + * and usually indicates a lack of stack space on which to allocate a native thread. The {@link + * ExecutorService} may reach an inconsistent state in such circumstances, so we avoid blocking + * on its termination when this field is non-{@code null}. */ private volatile Throwable catastrophe; @@ -103,11 +116,10 @@ public class AbstractQueueVisitor { private final Map<Thread, Long> jobs = Maps.newConcurrentMap(); /** - * The thread pool. If !concurrent, always null. Created lazily on first - * call to {@link #enqueue(Runnable)}, and removed after call to - * {@link #work(boolean)}. + * The {@link ExecutorService}. If !{@code concurrent}, always {@code null}. Created lazily on + * first call to {@link #enqueue(Runnable)}, and removed after call to {@link #work(boolean)}. */ - private final ThreadPoolExecutor pool; + private final ExecutorService pool; /** * Flag used to record when the main thread (the thread which called @@ -136,10 +148,8 @@ public class AbstractQueueVisitor { */ private final boolean failFastOnInterrupt; - /** - * If true, we must shut down the thread pool on completion. - */ - private final boolean ownThreadPool; + /** If true, we must shut down the {@link ExecutorService} on completion. */ + private final boolean ownExecutorService; /** * Flag used to record when all threads were killed by failed action execution. @@ -155,10 +165,9 @@ public class AbstractQueueVisitor { * * @param concurrent true if concurrency should be enabled. Only set to * false for debugging. - * @param corePoolSize the core pool size of the thread pool. See - * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, - * BlockingQueue)} - * @param maxPoolSize the max number of threads in the pool. + * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code + * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code + * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. * @param keepAliveTime the keep-alive time for the thread pool. * @param units the time units of keepAliveTime. * @param failFastOnException if true, don't run new actions after @@ -167,10 +176,10 @@ public class AbstractQueueVisitor { * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default * thread naming will be used. */ - public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize, + public AbstractQueueVisitor(boolean concurrent, int parallelism, long keepAliveTime, TimeUnit units, boolean failFastOnException, boolean failFastOnInterrupt, String poolName) { - this(concurrent, corePoolSize, maxPoolSize, keepAliveTime, units, failFastOnException, + this(concurrent, parallelism, keepAliveTime, units, failFastOnException, failFastOnInterrupt, poolName, EXECUTOR_FACTORY); } @@ -179,33 +188,39 @@ public class AbstractQueueVisitor { * * @param concurrent true if concurrency should be enabled. Only set to * false for debugging. - * @param corePoolSize the core pool size of the thread pool. See - * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, - * BlockingQueue)} - * @param maxPoolSize the max number of threads in the pool. + * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code + * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code + * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. * @param keepAliveTime the keep-alive time for the thread pool. * @param units the time units of keepAliveTime. * @param failFastOnException if true, don't run new actions after an uncaught exception. * @param failFastOnInterrupt if true, don't run new actions after interrupt. * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default * thread naming will be used. - * @param executorFactory the factory for constructing the thread pool if {@code concurrent} is - * true. + * @param executorFactory the factory for constructing the executor service if {@code concurrent} + * is true. */ - public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize, - long keepAliveTime, TimeUnit units, boolean failFastOnException, - boolean failFastOnInterrupt, String poolName, - Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory) { + public AbstractQueueVisitor( + boolean concurrent, + int parallelism, + long keepAliveTime, + TimeUnit units, + boolean failFastOnException, + boolean failFastOnInterrupt, + String poolName, + Function<ExecutorParams, ? extends ExecutorService> executorFactory) { Preconditions.checkNotNull(poolName); Preconditions.checkNotNull(executorFactory); this.concurrent = concurrent; this.failFastOnException = failFastOnException; this.failFastOnInterrupt = failFastOnInterrupt; - this.ownThreadPool = true; - this.pool = concurrent - ? executorFactory.apply(new ThreadPoolExecutorParams(corePoolSize, maxPoolSize, - keepAliveTime, units, poolName, getWorkQueue())) - : null; + this.ownExecutorService = true; + this.pool = + concurrent + ? executorFactory.apply( + new ExecutorParams( + parallelism, keepAliveTime, units, poolName, getWorkQueue())) + : null; } /** @@ -213,10 +228,9 @@ public class AbstractQueueVisitor { * * @param concurrent true if concurrency should be enabled. Only set to * false for debugging. - * @param corePoolSize the core pool size of the thread pool. See - * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, - * BlockingQueue)} - * @param maxPoolSize the max number of threads in the pool. + * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code + * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code + * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. * @param keepAliveTime the keep-alive time for the thread pool. * @param units the time units of keepAliveTime. * @param failFastOnException if true, don't run new actions after @@ -224,9 +238,9 @@ public class AbstractQueueVisitor { * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default * thread naming will be used. */ - public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize, + public AbstractQueueVisitor(boolean concurrent, int parallelism, long keepAliveTime, TimeUnit units, boolean failFastOnException, String poolName) { - this(concurrent, corePoolSize, maxPoolSize, keepAliveTime, units, failFastOnException, true, + this(concurrent, parallelism, keepAliveTime, units, failFastOnException, true, poolName); } @@ -268,7 +282,7 @@ public class AbstractQueueVisitor { this.failFastOnException = failFastOnException; this.failFastOnInterrupt = failFastOnInterrupt; this.pool = executor; - this.ownThreadPool = shutdownOnCompletion; + this.ownExecutorService = shutdownOnCompletion; } public AbstractQueueVisitor(ThreadPoolExecutor executor, boolean failFastOnException) { @@ -280,35 +294,33 @@ public class AbstractQueueVisitor { * * @param concurrent true if concurrency should be enabled. Only set to * false for debugging. - * @param corePoolSize the core pool size of the thread pool. See - * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, - * BlockingQueue)} - * @param maxPoolSize the max number of threads in the pool. + * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code + * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code + * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. * @param keepAliveTime the keep-alive time for the thread pool. * @param units the time units of keepAliveTime. * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default * thread naming will be used. */ - public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize, + public AbstractQueueVisitor(boolean concurrent, int parallelism, long keepAliveTime, TimeUnit units, String poolName) { - this(concurrent, corePoolSize, maxPoolSize, keepAliveTime, units, false, poolName); + this(concurrent, parallelism, keepAliveTime, units, false, poolName); } /** * Create the AbstractQueueVisitor with concurrency enabled. * - * @param corePoolSize the core pool size of the thread pool. See - * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, - * BlockingQueue)} - * @param maxPoolSize the max number of threads in the pool. + * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code + * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code + * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. * @param keepAlive the keep-alive time for the thread pool. * @param units the time units of keepAliveTime. * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default * thread naming will be used. */ - public AbstractQueueVisitor(int corePoolSize, int maxPoolSize, long keepAlive, TimeUnit units, + public AbstractQueueVisitor(int parallelism, long keepAlive, TimeUnit units, String poolName) { - this(true, corePoolSize, maxPoolSize, keepAlive, units, poolName); + this(true, parallelism, keepAlive, units, poolName); } protected BlockingQueue<Runnable> getWorkQueue() { @@ -361,8 +373,14 @@ public class AbstractQueueVisitor { } private void recordError(Throwable e) { - catastrophe = e; try { + // If threadInterrupted is true, then RejectedExecutionExceptions are expected. There's no + // need to remember them, but there is a need to call decrementRemainingTasks, which is + // satisfied by the finally block below. + if (e instanceof RejectedExecutionException && threadInterrupted) { + return; + } + catastrophe = e; synchronized (this) { if (unhandled == null) { // save only the first one. unhandled = e; @@ -439,7 +457,6 @@ public class AbstractQueueVisitor { */ private void setInterrupted() { threadInterrupted = true; - setRejectedExecutionHandler(); } private final void decrementRemainingTasks() { @@ -559,7 +576,7 @@ public class AbstractQueueVisitor { } } - if (ownThreadPool) { + if (ownExecutorService) { pool.shutdown(); for (;;) { try { @@ -603,17 +620,6 @@ public class AbstractQueueVisitor { return isCritical; } - private void setRejectedExecutionHandler() { - if (ownThreadPool) { - pool.setRejectedExecutionHandler(new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - decrementRemainingTasks(); - } - }); - } - } - /** * If exception is critical then set a flag which signals * to stop all jobs inside {@link #awaitTermination(boolean)}. |