aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java111
1 files changed, 75 insertions, 36 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index c4e526ba97..1eb64979d2 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -14,15 +14,16 @@
package com.google.devtools.build.lib.concurrent;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.concurrent.ErrorClassifier.ErrorClassification;
import java.util.Map;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -32,27 +33,6 @@ import java.util.logging.Logger;
/** A {@link QuiescingExecutor} implementation that wraps an {@link ExecutorService}. */
public class AbstractQueueVisitor implements QuiescingExecutor {
-
- /**
- * Default factory function for constructing {@link ThreadPoolExecutor}s. The {@link
- * ThreadPoolExecutor}s this creates have the same value for {@code corePoolSize} and {@code
- * maximumPoolSize} because that results in a fixed-size thread pool, and the current use cases
- * for {@link AbstractQueueVisitor} don't require any more sophisticated thread pool size
- * management.
- *
- * <p>If client use cases change, they may invoke one of the {@link
- * AbstractQueueVisitor#AbstractQueueVisitor} constructors that accepts a pre-constructed {@link
- * ThreadPoolExecutor}.
- */
- public static final Function<ExecutorParams, ThreadPoolExecutor> EXECUTOR_FACTORY =
- p ->
- new ThreadPoolExecutor(
- /*corePoolSize=*/ p.getParallelism(),
- /*maximumPoolSize=*/ p.getParallelism(),
- p.getKeepAliveTime(),
- p.getUnits(),
- p.getWorkQueue(),
- new ThreadFactoryBuilder().setNameFormat(p.getPoolName() + " %d").build());
/**
* The most severe unhandled exception thrown by a worker thread, according to {@link
* #errorClassifier}. This exception gets propagated to the calling thread of {@link
@@ -104,6 +84,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
private final ExecutorService executorService;
+ private final boolean usingPriorityQueue;
+
/**
* Flag used to record when the main thread (the thread which called {@link #awaitQuiescence}) is
* interrupted.
@@ -131,21 +113,55 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
private static final Logger logger = Logger.getLogger(AbstractQueueVisitor.class.getName());
+ /**
+ * Default function for constructing {@link ThreadPoolExecutor}s. The {@link ThreadPoolExecutor}s
+ * this creates have the same value for {@code corePoolSize} and {@code maximumPoolSize} because
+ * that results in a fixed-size thread pool, and the current use cases for {@link
+ * AbstractQueueVisitor} don't require any more sophisticated thread pool size management.
+ *
+ * <p>If client use cases change, they may invoke one of the {@link
+ * AbstractQueueVisitor#AbstractQueueVisitor} constructors that accepts a pre-constructed {@link
+ * ThreadPoolExecutor}.
+ */
private static ExecutorService createExecutorService(
int parallelism,
long keepAliveTime,
TimeUnit units,
+ BlockingQueue<Runnable> workQueue,
+ String poolName) {
+ return new ThreadPoolExecutor(
+ /*corePoolSize=*/ parallelism,
+ /*maximumPoolSize=*/ parallelism,
+ keepAliveTime,
+ units,
+ workQueue,
+ new ThreadFactoryBuilder()
+ .setNameFormat(Preconditions.checkNotNull(poolName) + " %d")
+ .build());
+ }
+
+ /**
+ * Creates an {@link AbstractQueueVisitor}, similar to {@link #AbstractQueueVisitor(int, long,
+ * TimeUnit, boolean, String, ErrorClassifier)}, but whose work is ordered by a {@link
+ * PriorityBlockingQueue}. The {@link Runnable} objects submitted to {@link #execute(Runnable)}
+ * must implement {@link Comparable}.
+ */
+ public static AbstractQueueVisitor createWithPriorityQueue(
+ int parallelism,
+ long keepAliveTime,
+ TimeUnit units,
+ boolean failFastOnException,
String poolName,
- Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
- return Preconditions.checkNotNull(executorFactory)
- .apply(
- new ExecutorParams(
- parallelism,
- keepAliveTime,
- units,
- Preconditions.checkNotNull(poolName),
- new BlockingStack<Runnable>()));
+ ErrorClassifier errorClassifier) {
+ return new AbstractQueueVisitor(
+ createExecutorService(
+ parallelism, keepAliveTime, units, new PriorityBlockingQueue<>(), poolName),
+ true,
+ failFastOnException,
+ errorClassifier,
+ /*usingPriorityQueue=*/ true);
}
+
/**
* Create the {@link AbstractQueueVisitor}.
*
@@ -157,7 +173,6 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
* @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code
* null}, default thread naming will be used.
- * @param executorFactory the factory for constructing the executor service.
* @param errorClassifier an error classifier used to determine whether to log and/or stop jobs.
*/
public AbstractQueueVisitor(
@@ -166,10 +181,9 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
TimeUnit units,
boolean failFastOnException,
String poolName,
- Function<ExecutorParams, ? extends ExecutorService> executorFactory,
ErrorClassifier errorClassifier) {
this(
- createExecutorService(parallelism, keepAliveTime, units, poolName, executorFactory),
+ createExecutorService(parallelism, keepAliveTime, units, new BlockingStack<>(), poolName),
true,
failFastOnException,
errorClassifier);
@@ -190,10 +204,25 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
boolean shutdownOnCompletion,
boolean failFastOnException,
ErrorClassifier errorClassifier) {
+ this(
+ executorService,
+ shutdownOnCompletion,
+ failFastOnException,
+ errorClassifier,
+ /*usingPriorityQueue=*/ false);
+ }
+
+ private AbstractQueueVisitor(
+ ExecutorService executorService,
+ boolean shutdownOnCompletion,
+ boolean failFastOnException,
+ ErrorClassifier errorClassifier,
+ boolean usingPriorityQueue) {
this.failFastOnException = failFastOnException;
this.ownExecutorService = shutdownOnCompletion;
this.executorService = Preconditions.checkNotNull(executorService);
this.errorClassifier = Preconditions.checkNotNull(errorClassifier);
+ this.usingPriorityQueue = usingPriorityQueue;
}
@Override
@@ -218,6 +247,9 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
/** Schedules a call. Called in a worker thread. */
@Override
public final void execute(Runnable runnable) {
+ if (usingPriorityQueue) {
+ Preconditions.checkState(runnable instanceof Comparable);
+ }
WrappedRunnable wrappedRunnable = new WrappedRunnable(runnable);
try {
// It's impossible for this increment to result in remainingTasks.get <= 0 because
@@ -239,7 +271,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
}
- protected void executeRunnable(Runnable runnable) {
+ protected void executeRunnable(WrappedRunnable runnable) {
executorService.execute(runnable);
}
@@ -309,7 +341,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* <li>And, lastly, calls {@link #decrementRemainingTasks}.
* </ul>
*/
- private final class WrappedRunnable implements Runnable {
+ protected final class WrappedRunnable implements Runnable, Comparable<WrappedRunnable> {
private final Runnable originalRunnable;
private volatile boolean ran;
@@ -345,6 +377,13 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
}
}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public int compareTo(WrappedRunnable o) {
+ // This should only be called when the concrete class is submitting comparable runnables.
+ return ((Comparable) originalRunnable).compareTo(o.originalRunnable);
+ }
}
private void addJob(Thread thread) {