aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java158
1 files changed, 82 insertions, 76 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index 87f85e3aa6..08903acd2c 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -24,8 +24,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -33,9 +34,9 @@ import java.util.logging.Level;
import java.util.logging.Logger;
/**
- * AbstractQueueVisitor is a wrapper around {@link ThreadPoolExecutor} which
- * delays thread pool shutdown until entire visitation is complete.
- * This is useful for cases in which worker tasks may submit additional tasks.
+ * AbstractQueueVisitor is a wrapper around {@link ExecutorService} which delays service shutdown
+ * until entire visitation is complete. This is useful for cases in which worker tasks may submit
+ * additional tasks.
*
* <p>Consider the following example:
* <pre>
@@ -56,14 +57,26 @@ import java.util.logging.Logger;
public class AbstractQueueVisitor {
/**
- * Default factory function for constructing {@link ThreadPoolExecutor}s.
+ * Default factory function for constructing {@link ThreadPoolExecutor}s. The {@link
+ * ThreadPoolExecutor}s this creates have the same value for {@code corePoolSize} and {@code
+ * maximumPoolSize} because that results in a fixed-size thread pool, and the current use cases
+ * for {@link AbstractQueueVisitor} don't require any more sophisticated thread pool size
+ * management.
+ *
+ * <p>If client use cases change, they may invoke one of the {@link
+ * AbstractQueueVisitor#AbstractQueueVisitor} constructors that accepts a pre-constructed {@link
+ * ThreadPoolExecutor}.
*/
- public static final Function<ThreadPoolExecutorParams, ThreadPoolExecutor> EXECUTOR_FACTORY =
- new Function<ThreadPoolExecutorParams, ThreadPoolExecutor>() {
+ public static final Function<ExecutorParams, ThreadPoolExecutor> EXECUTOR_FACTORY =
+ new Function<ExecutorParams, ThreadPoolExecutor>() {
@Override
- public ThreadPoolExecutor apply(ThreadPoolExecutorParams p) {
- return new ThreadPoolExecutor(p.getCorePoolSize(), p.getMaxPoolSize(),
- p.getKeepAliveTime(), p.getUnits(), p.getWorkQueue(),
+ public ThreadPoolExecutor apply(ExecutorParams p) {
+ return new ThreadPoolExecutor(
+ /*corePoolSize=*/ p.getParallelism(),
+ /*maximumPoolSize=*/ p.getParallelism(),
+ p.getKeepAliveTime(),
+ p.getUnits(),
+ p.getWorkQueue(),
new ThreadFactoryBuilder().setNameFormat(p.getPoolName() + " %d").build());
}
};
@@ -80,10 +93,10 @@ public class AbstractQueueVisitor {
private volatile Throwable unhandled = null;
/**
- * An uncaught exception when submitting a job to the ThreadPool is catastrophic, and usually
- * indicates a lack of stack space on which to allocate a native thread. The JDK
- * ThreadPoolExecutor may reach an inconsistent state in such circumstances, so we avoid blocking
- * on its termination when this field is non-null.
+ * An uncaught exception when submitting a job to the {@link ExecutorService} is catastrophic,
+ * and usually indicates a lack of stack space on which to allocate a native thread. The {@link
+ * ExecutorService} may reach an inconsistent state in such circumstances, so we avoid blocking
+ * on its termination when this field is non-{@code null}.
*/
private volatile Throwable catastrophe;
@@ -103,11 +116,10 @@ public class AbstractQueueVisitor {
private final Map<Thread, Long> jobs = Maps.newConcurrentMap();
/**
- * The thread pool. If !concurrent, always null. Created lazily on first
- * call to {@link #enqueue(Runnable)}, and removed after call to
- * {@link #work(boolean)}.
+ * The {@link ExecutorService}. If !{@code concurrent}, always {@code null}. Created lazily on
+ * first call to {@link #enqueue(Runnable)}, and removed after call to {@link #work(boolean)}.
*/
- private final ThreadPoolExecutor pool;
+ private final ExecutorService pool;
/**
* Flag used to record when the main thread (the thread which called
@@ -136,10 +148,8 @@ public class AbstractQueueVisitor {
*/
private final boolean failFastOnInterrupt;
- /**
- * If true, we must shut down the thread pool on completion.
- */
- private final boolean ownThreadPool;
+ /** If true, we must shut down the {@link ExecutorService} on completion. */
+ private final boolean ownExecutorService;
/**
* Flag used to record when all threads were killed by failed action execution.
@@ -155,10 +165,9 @@ public class AbstractQueueVisitor {
*
* @param concurrent true if concurrency should be enabled. Only set to
* false for debugging.
- * @param corePoolSize the core pool size of the thread pool. See
- * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit,
- * BlockingQueue)}
- * @param maxPoolSize the max number of threads in the pool.
+ * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
+ * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
+ * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
* @param keepAliveTime the keep-alive time for the thread pool.
* @param units the time units of keepAliveTime.
* @param failFastOnException if true, don't run new actions after
@@ -167,10 +176,10 @@ public class AbstractQueueVisitor {
* @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
* thread naming will be used.
*/
- public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize,
+ public AbstractQueueVisitor(boolean concurrent, int parallelism,
long keepAliveTime, TimeUnit units, boolean failFastOnException,
boolean failFastOnInterrupt, String poolName) {
- this(concurrent, corePoolSize, maxPoolSize, keepAliveTime, units, failFastOnException,
+ this(concurrent, parallelism, keepAliveTime, units, failFastOnException,
failFastOnInterrupt, poolName, EXECUTOR_FACTORY);
}
@@ -179,33 +188,39 @@ public class AbstractQueueVisitor {
*
* @param concurrent true if concurrency should be enabled. Only set to
* false for debugging.
- * @param corePoolSize the core pool size of the thread pool. See
- * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit,
- * BlockingQueue)}
- * @param maxPoolSize the max number of threads in the pool.
+ * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
+ * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
+ * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
* @param keepAliveTime the keep-alive time for the thread pool.
* @param units the time units of keepAliveTime.
* @param failFastOnException if true, don't run new actions after an uncaught exception.
* @param failFastOnInterrupt if true, don't run new actions after interrupt.
* @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
* thread naming will be used.
- * @param executorFactory the factory for constructing the thread pool if {@code concurrent} is
- * true.
+ * @param executorFactory the factory for constructing the executor service if {@code concurrent}
+ * is true.
*/
- public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize,
- long keepAliveTime, TimeUnit units, boolean failFastOnException,
- boolean failFastOnInterrupt, String poolName,
- Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory) {
+ public AbstractQueueVisitor(
+ boolean concurrent,
+ int parallelism,
+ long keepAliveTime,
+ TimeUnit units,
+ boolean failFastOnException,
+ boolean failFastOnInterrupt,
+ String poolName,
+ Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
Preconditions.checkNotNull(poolName);
Preconditions.checkNotNull(executorFactory);
this.concurrent = concurrent;
this.failFastOnException = failFastOnException;
this.failFastOnInterrupt = failFastOnInterrupt;
- this.ownThreadPool = true;
- this.pool = concurrent
- ? executorFactory.apply(new ThreadPoolExecutorParams(corePoolSize, maxPoolSize,
- keepAliveTime, units, poolName, getWorkQueue()))
- : null;
+ this.ownExecutorService = true;
+ this.pool =
+ concurrent
+ ? executorFactory.apply(
+ new ExecutorParams(
+ parallelism, keepAliveTime, units, poolName, getWorkQueue()))
+ : null;
}
/**
@@ -213,10 +228,9 @@ public class AbstractQueueVisitor {
*
* @param concurrent true if concurrency should be enabled. Only set to
* false for debugging.
- * @param corePoolSize the core pool size of the thread pool. See
- * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit,
- * BlockingQueue)}
- * @param maxPoolSize the max number of threads in the pool.
+ * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
+ * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
+ * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
* @param keepAliveTime the keep-alive time for the thread pool.
* @param units the time units of keepAliveTime.
* @param failFastOnException if true, don't run new actions after
@@ -224,9 +238,9 @@ public class AbstractQueueVisitor {
* @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
* thread naming will be used.
*/
- public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize,
+ public AbstractQueueVisitor(boolean concurrent, int parallelism,
long keepAliveTime, TimeUnit units, boolean failFastOnException, String poolName) {
- this(concurrent, corePoolSize, maxPoolSize, keepAliveTime, units, failFastOnException, true,
+ this(concurrent, parallelism, keepAliveTime, units, failFastOnException, true,
poolName);
}
@@ -268,7 +282,7 @@ public class AbstractQueueVisitor {
this.failFastOnException = failFastOnException;
this.failFastOnInterrupt = failFastOnInterrupt;
this.pool = executor;
- this.ownThreadPool = shutdownOnCompletion;
+ this.ownExecutorService = shutdownOnCompletion;
}
public AbstractQueueVisitor(ThreadPoolExecutor executor, boolean failFastOnException) {
@@ -280,35 +294,33 @@ public class AbstractQueueVisitor {
*
* @param concurrent true if concurrency should be enabled. Only set to
* false for debugging.
- * @param corePoolSize the core pool size of the thread pool. See
- * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit,
- * BlockingQueue)}
- * @param maxPoolSize the max number of threads in the pool.
+ * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
+ * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
+ * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
* @param keepAliveTime the keep-alive time for the thread pool.
* @param units the time units of keepAliveTime.
* @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
* thread naming will be used.
*/
- public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize,
+ public AbstractQueueVisitor(boolean concurrent, int parallelism,
long keepAliveTime, TimeUnit units, String poolName) {
- this(concurrent, corePoolSize, maxPoolSize, keepAliveTime, units, false, poolName);
+ this(concurrent, parallelism, keepAliveTime, units, false, poolName);
}
/**
* Create the AbstractQueueVisitor with concurrency enabled.
*
- * @param corePoolSize the core pool size of the thread pool. See
- * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit,
- * BlockingQueue)}
- * @param maxPoolSize the max number of threads in the pool.
+ * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
+ * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
+ * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
* @param keepAlive the keep-alive time for the thread pool.
* @param units the time units of keepAliveTime.
* @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
* thread naming will be used.
*/
- public AbstractQueueVisitor(int corePoolSize, int maxPoolSize, long keepAlive, TimeUnit units,
+ public AbstractQueueVisitor(int parallelism, long keepAlive, TimeUnit units,
String poolName) {
- this(true, corePoolSize, maxPoolSize, keepAlive, units, poolName);
+ this(true, parallelism, keepAlive, units, poolName);
}
protected BlockingQueue<Runnable> getWorkQueue() {
@@ -361,8 +373,14 @@ public class AbstractQueueVisitor {
}
private void recordError(Throwable e) {
- catastrophe = e;
try {
+ // If threadInterrupted is true, then RejectedExecutionExceptions are expected. There's no
+ // need to remember them, but there is a need to call decrementRemainingTasks, which is
+ // satisfied by the finally block below.
+ if (e instanceof RejectedExecutionException && threadInterrupted) {
+ return;
+ }
+ catastrophe = e;
synchronized (this) {
if (unhandled == null) { // save only the first one.
unhandled = e;
@@ -439,7 +457,6 @@ public class AbstractQueueVisitor {
*/
private void setInterrupted() {
threadInterrupted = true;
- setRejectedExecutionHandler();
}
private final void decrementRemainingTasks() {
@@ -559,7 +576,7 @@ public class AbstractQueueVisitor {
}
}
- if (ownThreadPool) {
+ if (ownExecutorService) {
pool.shutdown();
for (;;) {
try {
@@ -603,17 +620,6 @@ public class AbstractQueueVisitor {
return isCritical;
}
- private void setRejectedExecutionHandler() {
- if (ownThreadPool) {
- pool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- decrementRemainingTasks();
- }
- });
- }
- }
-
/**
* If exception is critical then set a flag which signals
* to stop all jobs inside {@link #awaitTermination(boolean)}.