aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Mark Schaller <mschaller@google.com>2015-10-13 20:06:19 +0000
committerGravatar Florian Weikert <fwe@google.com>2015-10-13 21:13:27 +0000
commiteff2b450bd44d019d3b23495e383cfce9e473fe6 (patch)
tree5ed1e0fffae45eee1153d7c36b34782cac76ab83
parent5a9b69919927ee076ca0817da3489e43eb88d338 (diff)
Allow other ExecutorService implementations in AbstractQueueVisitor
Previously, only ThreadPoolExecutor implementations were allowed. -- MOS_MIGRATED_REVID=105340237
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java158
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/ExecutorParams.java (renamed from src/main/java/com/google/devtools/build/lib/concurrent/ThreadPoolExecutorParams.java)30
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java3
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java19
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java11
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java1
-rw-r--r--src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java2
-rw-r--r--src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java2
-rw-r--r--src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java8
-rw-r--r--src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java19
10 files changed, 127 insertions, 126 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index 87f85e3aa6..08903acd2c 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -24,8 +24,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -33,9 +34,9 @@ import java.util.logging.Level;
import java.util.logging.Logger;
/**
- * AbstractQueueVisitor is a wrapper around {@link ThreadPoolExecutor} which
- * delays thread pool shutdown until entire visitation is complete.
- * This is useful for cases in which worker tasks may submit additional tasks.
+ * AbstractQueueVisitor is a wrapper around {@link ExecutorService} which delays service shutdown
+ * until entire visitation is complete. This is useful for cases in which worker tasks may submit
+ * additional tasks.
*
* <p>Consider the following example:
* <pre>
@@ -56,14 +57,26 @@ import java.util.logging.Logger;
public class AbstractQueueVisitor {
/**
- * Default factory function for constructing {@link ThreadPoolExecutor}s.
+ * Default factory function for constructing {@link ThreadPoolExecutor}s. The {@link
+ * ThreadPoolExecutor}s this creates have the same value for {@code corePoolSize} and {@code
+ * maximumPoolSize} because that results in a fixed-size thread pool, and the current use cases
+ * for {@link AbstractQueueVisitor} don't require any more sophisticated thread pool size
+ * management.
+ *
+ * <p>If client use cases change, they may invoke one of the {@link
+ * AbstractQueueVisitor#AbstractQueueVisitor} constructors that accepts a pre-constructed {@link
+ * ThreadPoolExecutor}.
*/
- public static final Function<ThreadPoolExecutorParams, ThreadPoolExecutor> EXECUTOR_FACTORY =
- new Function<ThreadPoolExecutorParams, ThreadPoolExecutor>() {
+ public static final Function<ExecutorParams, ThreadPoolExecutor> EXECUTOR_FACTORY =
+ new Function<ExecutorParams, ThreadPoolExecutor>() {
@Override
- public ThreadPoolExecutor apply(ThreadPoolExecutorParams p) {
- return new ThreadPoolExecutor(p.getCorePoolSize(), p.getMaxPoolSize(),
- p.getKeepAliveTime(), p.getUnits(), p.getWorkQueue(),
+ public ThreadPoolExecutor apply(ExecutorParams p) {
+ return new ThreadPoolExecutor(
+ /*corePoolSize=*/ p.getParallelism(),
+ /*maximumPoolSize=*/ p.getParallelism(),
+ p.getKeepAliveTime(),
+ p.getUnits(),
+ p.getWorkQueue(),
new ThreadFactoryBuilder().setNameFormat(p.getPoolName() + " %d").build());
}
};
@@ -80,10 +93,10 @@ public class AbstractQueueVisitor {
private volatile Throwable unhandled = null;
/**
- * An uncaught exception when submitting a job to the ThreadPool is catastrophic, and usually
- * indicates a lack of stack space on which to allocate a native thread. The JDK
- * ThreadPoolExecutor may reach an inconsistent state in such circumstances, so we avoid blocking
- * on its termination when this field is non-null.
+ * An uncaught exception when submitting a job to the {@link ExecutorService} is catastrophic,
+ * and usually indicates a lack of stack space on which to allocate a native thread. The {@link
+ * ExecutorService} may reach an inconsistent state in such circumstances, so we avoid blocking
+ * on its termination when this field is non-{@code null}.
*/
private volatile Throwable catastrophe;
@@ -103,11 +116,10 @@ public class AbstractQueueVisitor {
private final Map<Thread, Long> jobs = Maps.newConcurrentMap();
/**
- * The thread pool. If !concurrent, always null. Created lazily on first
- * call to {@link #enqueue(Runnable)}, and removed after call to
- * {@link #work(boolean)}.
+ * The {@link ExecutorService}. If !{@code concurrent}, always {@code null}. Created lazily on
+ * first call to {@link #enqueue(Runnable)}, and removed after call to {@link #work(boolean)}.
*/
- private final ThreadPoolExecutor pool;
+ private final ExecutorService pool;
/**
* Flag used to record when the main thread (the thread which called
@@ -136,10 +148,8 @@ public class AbstractQueueVisitor {
*/
private final boolean failFastOnInterrupt;
- /**
- * If true, we must shut down the thread pool on completion.
- */
- private final boolean ownThreadPool;
+ /** If true, we must shut down the {@link ExecutorService} on completion. */
+ private final boolean ownExecutorService;
/**
* Flag used to record when all threads were killed by failed action execution.
@@ -155,10 +165,9 @@ public class AbstractQueueVisitor {
*
* @param concurrent true if concurrency should be enabled. Only set to
* false for debugging.
- * @param corePoolSize the core pool size of the thread pool. See
- * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit,
- * BlockingQueue)}
- * @param maxPoolSize the max number of threads in the pool.
+ * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
+ * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
+ * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
* @param keepAliveTime the keep-alive time for the thread pool.
* @param units the time units of keepAliveTime.
* @param failFastOnException if true, don't run new actions after
@@ -167,10 +176,10 @@ public class AbstractQueueVisitor {
* @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
* thread naming will be used.
*/
- public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize,
+ public AbstractQueueVisitor(boolean concurrent, int parallelism,
long keepAliveTime, TimeUnit units, boolean failFastOnException,
boolean failFastOnInterrupt, String poolName) {
- this(concurrent, corePoolSize, maxPoolSize, keepAliveTime, units, failFastOnException,
+ this(concurrent, parallelism, keepAliveTime, units, failFastOnException,
failFastOnInterrupt, poolName, EXECUTOR_FACTORY);
}
@@ -179,33 +188,39 @@ public class AbstractQueueVisitor {
*
* @param concurrent true if concurrency should be enabled. Only set to
* false for debugging.
- * @param corePoolSize the core pool size of the thread pool. See
- * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit,
- * BlockingQueue)}
- * @param maxPoolSize the max number of threads in the pool.
+ * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
+ * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
+ * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
* @param keepAliveTime the keep-alive time for the thread pool.
* @param units the time units of keepAliveTime.
* @param failFastOnException if true, don't run new actions after an uncaught exception.
* @param failFastOnInterrupt if true, don't run new actions after interrupt.
* @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
* thread naming will be used.
- * @param executorFactory the factory for constructing the thread pool if {@code concurrent} is
- * true.
+ * @param executorFactory the factory for constructing the executor service if {@code concurrent}
+ * is true.
*/
- public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize,
- long keepAliveTime, TimeUnit units, boolean failFastOnException,
- boolean failFastOnInterrupt, String poolName,
- Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory) {
+ public AbstractQueueVisitor(
+ boolean concurrent,
+ int parallelism,
+ long keepAliveTime,
+ TimeUnit units,
+ boolean failFastOnException,
+ boolean failFastOnInterrupt,
+ String poolName,
+ Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
Preconditions.checkNotNull(poolName);
Preconditions.checkNotNull(executorFactory);
this.concurrent = concurrent;
this.failFastOnException = failFastOnException;
this.failFastOnInterrupt = failFastOnInterrupt;
- this.ownThreadPool = true;
- this.pool = concurrent
- ? executorFactory.apply(new ThreadPoolExecutorParams(corePoolSize, maxPoolSize,
- keepAliveTime, units, poolName, getWorkQueue()))
- : null;
+ this.ownExecutorService = true;
+ this.pool =
+ concurrent
+ ? executorFactory.apply(
+ new ExecutorParams(
+ parallelism, keepAliveTime, units, poolName, getWorkQueue()))
+ : null;
}
/**
@@ -213,10 +228,9 @@ public class AbstractQueueVisitor {
*
* @param concurrent true if concurrency should be enabled. Only set to
* false for debugging.
- * @param corePoolSize the core pool size of the thread pool. See
- * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit,
- * BlockingQueue)}
- * @param maxPoolSize the max number of threads in the pool.
+ * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
+ * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
+ * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
* @param keepAliveTime the keep-alive time for the thread pool.
* @param units the time units of keepAliveTime.
* @param failFastOnException if true, don't run new actions after
@@ -224,9 +238,9 @@ public class AbstractQueueVisitor {
* @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
* thread naming will be used.
*/
- public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize,
+ public AbstractQueueVisitor(boolean concurrent, int parallelism,
long keepAliveTime, TimeUnit units, boolean failFastOnException, String poolName) {
- this(concurrent, corePoolSize, maxPoolSize, keepAliveTime, units, failFastOnException, true,
+ this(concurrent, parallelism, keepAliveTime, units, failFastOnException, true,
poolName);
}
@@ -268,7 +282,7 @@ public class AbstractQueueVisitor {
this.failFastOnException = failFastOnException;
this.failFastOnInterrupt = failFastOnInterrupt;
this.pool = executor;
- this.ownThreadPool = shutdownOnCompletion;
+ this.ownExecutorService = shutdownOnCompletion;
}
public AbstractQueueVisitor(ThreadPoolExecutor executor, boolean failFastOnException) {
@@ -280,35 +294,33 @@ public class AbstractQueueVisitor {
*
* @param concurrent true if concurrency should be enabled. Only set to
* false for debugging.
- * @param corePoolSize the core pool size of the thread pool. See
- * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit,
- * BlockingQueue)}
- * @param maxPoolSize the max number of threads in the pool.
+ * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
+ * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
+ * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
* @param keepAliveTime the keep-alive time for the thread pool.
* @param units the time units of keepAliveTime.
* @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
* thread naming will be used.
*/
- public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize,
+ public AbstractQueueVisitor(boolean concurrent, int parallelism,
long keepAliveTime, TimeUnit units, String poolName) {
- this(concurrent, corePoolSize, maxPoolSize, keepAliveTime, units, false, poolName);
+ this(concurrent, parallelism, keepAliveTime, units, false, poolName);
}
/**
* Create the AbstractQueueVisitor with concurrency enabled.
*
- * @param corePoolSize the core pool size of the thread pool. See
- * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit,
- * BlockingQueue)}
- * @param maxPoolSize the max number of threads in the pool.
+ * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
+ * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
+ * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
* @param keepAlive the keep-alive time for the thread pool.
* @param units the time units of keepAliveTime.
* @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
* thread naming will be used.
*/
- public AbstractQueueVisitor(int corePoolSize, int maxPoolSize, long keepAlive, TimeUnit units,
+ public AbstractQueueVisitor(int parallelism, long keepAlive, TimeUnit units,
String poolName) {
- this(true, corePoolSize, maxPoolSize, keepAlive, units, poolName);
+ this(true, parallelism, keepAlive, units, poolName);
}
protected BlockingQueue<Runnable> getWorkQueue() {
@@ -361,8 +373,14 @@ public class AbstractQueueVisitor {
}
private void recordError(Throwable e) {
- catastrophe = e;
try {
+ // If threadInterrupted is true, then RejectedExecutionExceptions are expected. There's no
+ // need to remember them, but there is a need to call decrementRemainingTasks, which is
+ // satisfied by the finally block below.
+ if (e instanceof RejectedExecutionException && threadInterrupted) {
+ return;
+ }
+ catastrophe = e;
synchronized (this) {
if (unhandled == null) { // save only the first one.
unhandled = e;
@@ -439,7 +457,6 @@ public class AbstractQueueVisitor {
*/
private void setInterrupted() {
threadInterrupted = true;
- setRejectedExecutionHandler();
}
private final void decrementRemainingTasks() {
@@ -559,7 +576,7 @@ public class AbstractQueueVisitor {
}
}
- if (ownThreadPool) {
+ if (ownExecutorService) {
pool.shutdown();
for (;;) {
try {
@@ -603,17 +620,6 @@ public class AbstractQueueVisitor {
return isCritical;
}
- private void setRejectedExecutionHandler() {
- if (ownThreadPool) {
- pool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- decrementRemainingTasks();
- }
- });
- }
- }
-
/**
* If exception is critical then set a flag which signals
* to stop all jobs inside {@link #awaitTermination(boolean)}.
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ThreadPoolExecutorParams.java b/src/main/java/com/google/devtools/build/lib/concurrent/ExecutorParams.java
index b19b29aa4e..c06144c8df 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/ThreadPoolExecutorParams.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/ExecutorParams.java
@@ -15,36 +15,32 @@
package com.google.devtools.build.lib.concurrent;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-/**
- * Configuration parameters for {@link ThreadPoolExecutor} construction.
- */
-public class ThreadPoolExecutorParams {
- private final int corePoolSize;
- private final int maxPoolSize;
+/** Configuration parameters for {@link ExecutorService} construction. */
+public class ExecutorParams {
+ private final int parallelism;
private final long keepAliveTime;
private final TimeUnit units;
private final String poolName;
private final BlockingQueue<Runnable> workQueue;
- public ThreadPoolExecutorParams(int corePoolSize, int maxPoolSize, long keepAliveTime,
- TimeUnit units, String poolName, BlockingQueue<Runnable> workQueue) {
- this.corePoolSize = corePoolSize;
- this.maxPoolSize = maxPoolSize;
+ public ExecutorParams(
+ int parallelism,
+ long keepAliveTime,
+ TimeUnit units,
+ String poolName,
+ BlockingQueue<Runnable> workQueue) {
+ this.parallelism = parallelism;
this.keepAliveTime = keepAliveTime;
this.units = units;
this.poolName = poolName;
this.workQueue = workQueue;
}
- public int getCorePoolSize() {
- return corePoolSize;
- }
-
- public int getMaxPoolSize() {
- return maxPoolSize;
+ public int getParallelism() {
+ return parallelism;
}
public long getKeepAliveTime() {
diff --git a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
index 4b6225079d..35e84d3b15 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
@@ -280,8 +280,7 @@ final class LabelVisitor {
// Observing the loading phase of a typical large package (with all subpackages) shows
// maximum thread-level concurrency of ~20. Limiting the total number of threads to 200 is
// therefore conservative and should help us avoid hitting native limits.
- super(CONCURRENT, parallelThreads, parallelThreads, 1L, TimeUnit.SECONDS, !keepGoing,
- THREAD_NAME);
+ super(CONCURRENT, parallelThreads, 1L, TimeUnit.SECONDS, !keepGoing, THREAD_NAME);
this.eventHandler = eventHandler;
this.maxDepth = maxDepth;
this.errorObserver = new TargetEdgeErrorObserver();
diff --git a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java
index be6cf81310..ce76bc6996 100644
--- a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java
@@ -15,12 +15,12 @@ package com.google.devtools.build.skyframe;
import com.google.common.base.Function;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
-import com.google.devtools.build.lib.concurrent.ThreadPoolExecutorParams;
+import com.google.devtools.build.lib.concurrent.ExecutorParams;
import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DeletingNodeVisitor;
import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DirtyingNodeVisitor;
import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.InvalidationState;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
@@ -73,24 +73,13 @@ public final class EagerInvalidator {
EvaluationProgressReceiver invalidationReceiver,
InvalidationState state,
DirtyKeyTracker dirtyKeyTracker,
- Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory) {
+ Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
state.update(diff);
return state.isEmpty() ? null
: new DirtyingNodeVisitor(graph, invalidationReceiver, state, dirtyKeyTracker,
executorFactory);
}
- @Nullable
- static DirtyingNodeVisitor createInvalidatingVisitorIfNeeded(
- DirtiableGraph graph,
- Iterable<SkyKey> diff,
- EvaluationProgressReceiver invalidationReceiver,
- InvalidationState state,
- DirtyKeyTracker dirtyKeyTracker) {
- return createInvalidatingVisitorIfNeeded(graph, diff, invalidationReceiver, state,
- dirtyKeyTracker, AbstractQueueVisitor.EXECUTOR_FACTORY);
- }
-
/**
* Invalidates given values and their upward transitive closure in the graph, using an executor
* constructed with the provided factory, if necessary.
@@ -101,7 +90,7 @@ public final class EagerInvalidator {
EvaluationProgressReceiver invalidationReceiver,
InvalidationState state,
DirtyKeyTracker dirtyKeyTracker,
- Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory)
+ Function<ExecutorParams, ? extends ExecutorService> executorFactory)
throws InterruptedException {
// If we are invalidating, we must be in an incremental build by definition, so we must
// maintain a consistent graph state by traversing the graph and invalidating transitive
diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
index 2b4fa85c71..459c7ffc7d 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
@@ -21,13 +21,13 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
-import com.google.devtools.build.lib.concurrent.ThreadPoolExecutorParams;
+import com.google.devtools.build.lib.concurrent.ExecutorParams;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.util.Pair;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -80,10 +80,9 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr
@Nullable EvaluationProgressReceiver invalidationReceiver,
InvalidationState state,
DirtyKeyTracker dirtyKeyTracker,
- Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory) {
+ Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
super(/*concurrent=*/true,
- /*corePoolSize=*/DEFAULT_THREAD_COUNT,
- /*maxPoolSize=*/DEFAULT_THREAD_COUNT,
+ /*parallelism=*/DEFAULT_THREAD_COUNT,
/*keepAliveTime=*/1,
/*units=*/TimeUnit.SECONDS,
/*failFastOnException=*/true,
@@ -298,7 +297,7 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr
EvaluationProgressReceiver invalidationReceiver,
InvalidationState state,
DirtyKeyTracker dirtyKeyTracker,
- Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory) {
+ Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
super(graph, invalidationReceiver, state, dirtyKeyTracker, executorFactory);
}
diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
index fc54c1f8fd..59ff33ad48 100644
--- a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
@@ -600,7 +600,6 @@ public final class ParallelEvaluator implements Evaluator {
private ValueVisitor(int threadCount) {
super(/*concurrent*/true,
threadCount,
- threadCount,
1, TimeUnit.SECONDS,
/*failFastOnException*/true,
/*failFastOnInterrupt*/true,
diff --git a/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java b/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java
index db004b2acc..d9fb1a3080 100644
--- a/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java
+++ b/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java
@@ -134,7 +134,7 @@ public class ConcurrentMultimapWithHeadElementTest {
private final AtomicInteger actionCount = new AtomicInteger(0);
private StressTester() {
- super(/*concurrent=*/true, 200, 200, 1, TimeUnit.SECONDS,
+ super(/*concurrent=*/true, 200, 1, TimeUnit.SECONDS,
/*failFastOnException=*/true, /*failFastOnInterrupt=*/true, "action-graph-test");
}
diff --git a/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java b/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java
index 82fd413eee..6d5a23e665 100644
--- a/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java
+++ b/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java
@@ -81,7 +81,7 @@ public class MapBasedActionGraphTest {
private final AtomicInteger actionCount = new AtomicInteger(0);
private ActionRegisterer() {
- super(/*concurrent=*/true, 200, 200, 1, TimeUnit.SECONDS,
+ super(/*concurrent=*/true, 200, 1, TimeUnit.SECONDS,
/*failFastOnException=*/true, /*failFastOnInterrupt=*/true, "action-graph-test");
FileSystem fileSystem = new InMemoryFileSystem(BlazeClock.instance());
Path path = fileSystem.getPath("/root/foo");
diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java
index cec348d2a2..19b4274267 100644
--- a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java
+++ b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java
@@ -474,7 +474,7 @@ public class AbstractQueueVisitorTest {
private final Object lock = new Object();
public CountingQueueVisitor() {
- super(5, 5, 3L, TimeUnit.SECONDS, THREAD_NAME);
+ super(5, 3L, TimeUnit.SECONDS, THREAD_NAME);
}
public CountingQueueVisitor(ThreadPoolExecutor executor) {
@@ -505,15 +505,15 @@ public class AbstractQueueVisitorTest {
private final static String THREAD_NAME = "BlazeTest ConcreteQueueVisitor";
public ConcreteQueueVisitor() {
- super(5, 5, 3L, TimeUnit.SECONDS, THREAD_NAME);
+ super(5, 3L, TimeUnit.SECONDS, THREAD_NAME);
}
public ConcreteQueueVisitor(boolean failFast) {
- super(true, 5, 5, 3L, TimeUnit.SECONDS, failFast, THREAD_NAME);
+ super(true, 5, 3L, TimeUnit.SECONDS, failFast, THREAD_NAME);
}
public ConcreteQueueVisitor(boolean failFast, boolean failFastOnInterrupt) {
- super(true, 5, 5, 3L, TimeUnit.SECONDS, failFast, failFastOnInterrupt, THREAD_NAME);
+ super(true, 5, 3L, TimeUnit.SECONDS, failFast, failFastOnInterrupt, THREAD_NAME);
}
public ConcreteQueueVisitor(ThreadPoolExecutor executor, boolean failFast,
diff --git a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
index 87f34a25d6..be7391f668 100644
--- a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
+++ b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
@@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.testing.GcFinalization;
+import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
import com.google.devtools.build.lib.events.Reporter;
import com.google.devtools.build.lib.testutil.TestUtils;
import com.google.devtools.build.lib.util.Pair;
@@ -639,8 +640,15 @@ public class EagerInvalidatorTest {
// Dirty the node, and ensure that the tracker is aware of it:
Iterable<SkyKey> diff1 = ImmutableList.of(skyKey("a"));
InvalidationState state1 = new DirtyingInvalidationState();
- Preconditions.checkNotNull(EagerInvalidator.createInvalidatingVisitorIfNeeded(graph, diff1,
- receiver, state1, dirtyKeyTracker)).run();
+ Preconditions.checkNotNull(
+ EagerInvalidator.createInvalidatingVisitorIfNeeded(
+ graph,
+ diff1,
+ receiver,
+ state1,
+ dirtyKeyTracker,
+ AbstractQueueVisitor.EXECUTOR_FACTORY))
+ .run();
assertThat(dirtyKeyTracker.getDirtyKeys()).containsExactly(skyKey("a"), skyKey("ab"));
// Delete the node, and ensure that the tracker is no longer tracking it:
@@ -662,7 +670,12 @@ public class EagerInvalidatorTest {
Iterable<SkyKey> diff = ImmutableList.copyOf(keys);
DirtyingNodeVisitor dirtyingNodeVisitor =
EagerInvalidator.createInvalidatingVisitorIfNeeded(
- graph, diff, invalidationReceiver, state, dirtyKeyTracker);
+ graph,
+ diff,
+ invalidationReceiver,
+ state,
+ dirtyKeyTracker,
+ AbstractQueueVisitor.EXECUTOR_FACTORY);
if (dirtyingNodeVisitor != null) {
visitor.set(dirtyingNodeVisitor);
dirtyingNodeVisitor.run();