diff options
author | Mark Schaller <mschaller@google.com> | 2015-10-13 20:06:19 +0000 |
---|---|---|
committer | Florian Weikert <fwe@google.com> | 2015-10-13 21:13:27 +0000 |
commit | eff2b450bd44d019d3b23495e383cfce9e473fe6 (patch) | |
tree | 5ed1e0fffae45eee1153d7c36b34782cac76ab83 | |
parent | 5a9b69919927ee076ca0817da3489e43eb88d338 (diff) |
Allow other ExecutorService implementations in AbstractQueueVisitor
Previously, only ThreadPoolExecutor implementations were allowed.
--
MOS_MIGRATED_REVID=105340237
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java | 158 | ||||
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/concurrent/ExecutorParams.java (renamed from src/main/java/com/google/devtools/build/lib/concurrent/ThreadPoolExecutorParams.java) | 30 | ||||
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java | 3 | ||||
-rw-r--r-- | src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java | 19 | ||||
-rw-r--r-- | src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java | 11 | ||||
-rw-r--r-- | src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java | 1 | ||||
-rw-r--r-- | src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java | 2 | ||||
-rw-r--r-- | src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java | 2 | ||||
-rw-r--r-- | src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java | 8 | ||||
-rw-r--r-- | src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java | 19 |
10 files changed, 127 insertions, 126 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java index 87f85e3aa6..08903acd2c 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java @@ -24,8 +24,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -33,9 +34,9 @@ import java.util.logging.Level; import java.util.logging.Logger; /** - * AbstractQueueVisitor is a wrapper around {@link ThreadPoolExecutor} which - * delays thread pool shutdown until entire visitation is complete. - * This is useful for cases in which worker tasks may submit additional tasks. + * AbstractQueueVisitor is a wrapper around {@link ExecutorService} which delays service shutdown + * until entire visitation is complete. This is useful for cases in which worker tasks may submit + * additional tasks. * * <p>Consider the following example: * <pre> @@ -56,14 +57,26 @@ import java.util.logging.Logger; public class AbstractQueueVisitor { /** - * Default factory function for constructing {@link ThreadPoolExecutor}s. + * Default factory function for constructing {@link ThreadPoolExecutor}s. The {@link + * ThreadPoolExecutor}s this creates have the same value for {@code corePoolSize} and {@code + * maximumPoolSize} because that results in a fixed-size thread pool, and the current use cases + * for {@link AbstractQueueVisitor} don't require any more sophisticated thread pool size + * management. + * + * <p>If client use cases change, they may invoke one of the {@link + * AbstractQueueVisitor#AbstractQueueVisitor} constructors that accepts a pre-constructed {@link + * ThreadPoolExecutor}. */ - public static final Function<ThreadPoolExecutorParams, ThreadPoolExecutor> EXECUTOR_FACTORY = - new Function<ThreadPoolExecutorParams, ThreadPoolExecutor>() { + public static final Function<ExecutorParams, ThreadPoolExecutor> EXECUTOR_FACTORY = + new Function<ExecutorParams, ThreadPoolExecutor>() { @Override - public ThreadPoolExecutor apply(ThreadPoolExecutorParams p) { - return new ThreadPoolExecutor(p.getCorePoolSize(), p.getMaxPoolSize(), - p.getKeepAliveTime(), p.getUnits(), p.getWorkQueue(), + public ThreadPoolExecutor apply(ExecutorParams p) { + return new ThreadPoolExecutor( + /*corePoolSize=*/ p.getParallelism(), + /*maximumPoolSize=*/ p.getParallelism(), + p.getKeepAliveTime(), + p.getUnits(), + p.getWorkQueue(), new ThreadFactoryBuilder().setNameFormat(p.getPoolName() + " %d").build()); } }; @@ -80,10 +93,10 @@ public class AbstractQueueVisitor { private volatile Throwable unhandled = null; /** - * An uncaught exception when submitting a job to the ThreadPool is catastrophic, and usually - * indicates a lack of stack space on which to allocate a native thread. The JDK - * ThreadPoolExecutor may reach an inconsistent state in such circumstances, so we avoid blocking - * on its termination when this field is non-null. + * An uncaught exception when submitting a job to the {@link ExecutorService} is catastrophic, + * and usually indicates a lack of stack space on which to allocate a native thread. The {@link + * ExecutorService} may reach an inconsistent state in such circumstances, so we avoid blocking + * on its termination when this field is non-{@code null}. */ private volatile Throwable catastrophe; @@ -103,11 +116,10 @@ public class AbstractQueueVisitor { private final Map<Thread, Long> jobs = Maps.newConcurrentMap(); /** - * The thread pool. If !concurrent, always null. Created lazily on first - * call to {@link #enqueue(Runnable)}, and removed after call to - * {@link #work(boolean)}. + * The {@link ExecutorService}. If !{@code concurrent}, always {@code null}. Created lazily on + * first call to {@link #enqueue(Runnable)}, and removed after call to {@link #work(boolean)}. */ - private final ThreadPoolExecutor pool; + private final ExecutorService pool; /** * Flag used to record when the main thread (the thread which called @@ -136,10 +148,8 @@ public class AbstractQueueVisitor { */ private final boolean failFastOnInterrupt; - /** - * If true, we must shut down the thread pool on completion. - */ - private final boolean ownThreadPool; + /** If true, we must shut down the {@link ExecutorService} on completion. */ + private final boolean ownExecutorService; /** * Flag used to record when all threads were killed by failed action execution. @@ -155,10 +165,9 @@ public class AbstractQueueVisitor { * * @param concurrent true if concurrency should be enabled. Only set to * false for debugging. - * @param corePoolSize the core pool size of the thread pool. See - * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, - * BlockingQueue)} - * @param maxPoolSize the max number of threads in the pool. + * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code + * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code + * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. * @param keepAliveTime the keep-alive time for the thread pool. * @param units the time units of keepAliveTime. * @param failFastOnException if true, don't run new actions after @@ -167,10 +176,10 @@ public class AbstractQueueVisitor { * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default * thread naming will be used. */ - public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize, + public AbstractQueueVisitor(boolean concurrent, int parallelism, long keepAliveTime, TimeUnit units, boolean failFastOnException, boolean failFastOnInterrupt, String poolName) { - this(concurrent, corePoolSize, maxPoolSize, keepAliveTime, units, failFastOnException, + this(concurrent, parallelism, keepAliveTime, units, failFastOnException, failFastOnInterrupt, poolName, EXECUTOR_FACTORY); } @@ -179,33 +188,39 @@ public class AbstractQueueVisitor { * * @param concurrent true if concurrency should be enabled. Only set to * false for debugging. - * @param corePoolSize the core pool size of the thread pool. See - * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, - * BlockingQueue)} - * @param maxPoolSize the max number of threads in the pool. + * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code + * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code + * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. * @param keepAliveTime the keep-alive time for the thread pool. * @param units the time units of keepAliveTime. * @param failFastOnException if true, don't run new actions after an uncaught exception. * @param failFastOnInterrupt if true, don't run new actions after interrupt. * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default * thread naming will be used. - * @param executorFactory the factory for constructing the thread pool if {@code concurrent} is - * true. + * @param executorFactory the factory for constructing the executor service if {@code concurrent} + * is true. */ - public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize, - long keepAliveTime, TimeUnit units, boolean failFastOnException, - boolean failFastOnInterrupt, String poolName, - Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory) { + public AbstractQueueVisitor( + boolean concurrent, + int parallelism, + long keepAliveTime, + TimeUnit units, + boolean failFastOnException, + boolean failFastOnInterrupt, + String poolName, + Function<ExecutorParams, ? extends ExecutorService> executorFactory) { Preconditions.checkNotNull(poolName); Preconditions.checkNotNull(executorFactory); this.concurrent = concurrent; this.failFastOnException = failFastOnException; this.failFastOnInterrupt = failFastOnInterrupt; - this.ownThreadPool = true; - this.pool = concurrent - ? executorFactory.apply(new ThreadPoolExecutorParams(corePoolSize, maxPoolSize, - keepAliveTime, units, poolName, getWorkQueue())) - : null; + this.ownExecutorService = true; + this.pool = + concurrent + ? executorFactory.apply( + new ExecutorParams( + parallelism, keepAliveTime, units, poolName, getWorkQueue())) + : null; } /** @@ -213,10 +228,9 @@ public class AbstractQueueVisitor { * * @param concurrent true if concurrency should be enabled. Only set to * false for debugging. - * @param corePoolSize the core pool size of the thread pool. See - * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, - * BlockingQueue)} - * @param maxPoolSize the max number of threads in the pool. + * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code + * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code + * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. * @param keepAliveTime the keep-alive time for the thread pool. * @param units the time units of keepAliveTime. * @param failFastOnException if true, don't run new actions after @@ -224,9 +238,9 @@ public class AbstractQueueVisitor { * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default * thread naming will be used. */ - public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize, + public AbstractQueueVisitor(boolean concurrent, int parallelism, long keepAliveTime, TimeUnit units, boolean failFastOnException, String poolName) { - this(concurrent, corePoolSize, maxPoolSize, keepAliveTime, units, failFastOnException, true, + this(concurrent, parallelism, keepAliveTime, units, failFastOnException, true, poolName); } @@ -268,7 +282,7 @@ public class AbstractQueueVisitor { this.failFastOnException = failFastOnException; this.failFastOnInterrupt = failFastOnInterrupt; this.pool = executor; - this.ownThreadPool = shutdownOnCompletion; + this.ownExecutorService = shutdownOnCompletion; } public AbstractQueueVisitor(ThreadPoolExecutor executor, boolean failFastOnException) { @@ -280,35 +294,33 @@ public class AbstractQueueVisitor { * * @param concurrent true if concurrency should be enabled. Only set to * false for debugging. - * @param corePoolSize the core pool size of the thread pool. See - * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, - * BlockingQueue)} - * @param maxPoolSize the max number of threads in the pool. + * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code + * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code + * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. * @param keepAliveTime the keep-alive time for the thread pool. * @param units the time units of keepAliveTime. * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default * thread naming will be used. */ - public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize, + public AbstractQueueVisitor(boolean concurrent, int parallelism, long keepAliveTime, TimeUnit units, String poolName) { - this(concurrent, corePoolSize, maxPoolSize, keepAliveTime, units, false, poolName); + this(concurrent, parallelism, keepAliveTime, units, false, poolName); } /** * Create the AbstractQueueVisitor with concurrency enabled. * - * @param corePoolSize the core pool size of the thread pool. See - * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, - * BlockingQueue)} - * @param maxPoolSize the max number of threads in the pool. + * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code + * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code + * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. * @param keepAlive the keep-alive time for the thread pool. * @param units the time units of keepAliveTime. * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default * thread naming will be used. */ - public AbstractQueueVisitor(int corePoolSize, int maxPoolSize, long keepAlive, TimeUnit units, + public AbstractQueueVisitor(int parallelism, long keepAlive, TimeUnit units, String poolName) { - this(true, corePoolSize, maxPoolSize, keepAlive, units, poolName); + this(true, parallelism, keepAlive, units, poolName); } protected BlockingQueue<Runnable> getWorkQueue() { @@ -361,8 +373,14 @@ public class AbstractQueueVisitor { } private void recordError(Throwable e) { - catastrophe = e; try { + // If threadInterrupted is true, then RejectedExecutionExceptions are expected. There's no + // need to remember them, but there is a need to call decrementRemainingTasks, which is + // satisfied by the finally block below. + if (e instanceof RejectedExecutionException && threadInterrupted) { + return; + } + catastrophe = e; synchronized (this) { if (unhandled == null) { // save only the first one. unhandled = e; @@ -439,7 +457,6 @@ public class AbstractQueueVisitor { */ private void setInterrupted() { threadInterrupted = true; - setRejectedExecutionHandler(); } private final void decrementRemainingTasks() { @@ -559,7 +576,7 @@ public class AbstractQueueVisitor { } } - if (ownThreadPool) { + if (ownExecutorService) { pool.shutdown(); for (;;) { try { @@ -603,17 +620,6 @@ public class AbstractQueueVisitor { return isCritical; } - private void setRejectedExecutionHandler() { - if (ownThreadPool) { - pool.setRejectedExecutionHandler(new RejectedExecutionHandler() { - @Override - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { - decrementRemainingTasks(); - } - }); - } - } - /** * If exception is critical then set a flag which signals * to stop all jobs inside {@link #awaitTermination(boolean)}. diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ThreadPoolExecutorParams.java b/src/main/java/com/google/devtools/build/lib/concurrent/ExecutorParams.java index b19b29aa4e..c06144c8df 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/ThreadPoolExecutorParams.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/ExecutorParams.java @@ -15,36 +15,32 @@ package com.google.devtools.build.lib.concurrent; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -/** - * Configuration parameters for {@link ThreadPoolExecutor} construction. - */ -public class ThreadPoolExecutorParams { - private final int corePoolSize; - private final int maxPoolSize; +/** Configuration parameters for {@link ExecutorService} construction. */ +public class ExecutorParams { + private final int parallelism; private final long keepAliveTime; private final TimeUnit units; private final String poolName; private final BlockingQueue<Runnable> workQueue; - public ThreadPoolExecutorParams(int corePoolSize, int maxPoolSize, long keepAliveTime, - TimeUnit units, String poolName, BlockingQueue<Runnable> workQueue) { - this.corePoolSize = corePoolSize; - this.maxPoolSize = maxPoolSize; + public ExecutorParams( + int parallelism, + long keepAliveTime, + TimeUnit units, + String poolName, + BlockingQueue<Runnable> workQueue) { + this.parallelism = parallelism; this.keepAliveTime = keepAliveTime; this.units = units; this.poolName = poolName; this.workQueue = workQueue; } - public int getCorePoolSize() { - return corePoolSize; - } - - public int getMaxPoolSize() { - return maxPoolSize; + public int getParallelism() { + return parallelism; } public long getKeepAliveTime() { diff --git a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java index 4b6225079d..35e84d3b15 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java @@ -280,8 +280,7 @@ final class LabelVisitor { // Observing the loading phase of a typical large package (with all subpackages) shows // maximum thread-level concurrency of ~20. Limiting the total number of threads to 200 is // therefore conservative and should help us avoid hitting native limits. - super(CONCURRENT, parallelThreads, parallelThreads, 1L, TimeUnit.SECONDS, !keepGoing, - THREAD_NAME); + super(CONCURRENT, parallelThreads, 1L, TimeUnit.SECONDS, !keepGoing, THREAD_NAME); this.eventHandler = eventHandler; this.maxDepth = maxDepth; this.errorObserver = new TargetEdgeErrorObserver(); diff --git a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java index be6cf81310..ce76bc6996 100644 --- a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java +++ b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java @@ -15,12 +15,12 @@ package com.google.devtools.build.skyframe; import com.google.common.base.Function; import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; -import com.google.devtools.build.lib.concurrent.ThreadPoolExecutorParams; +import com.google.devtools.build.lib.concurrent.ExecutorParams; import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DeletingNodeVisitor; import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DirtyingNodeVisitor; import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.InvalidationState; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; import javax.annotation.Nullable; @@ -73,24 +73,13 @@ public final class EagerInvalidator { EvaluationProgressReceiver invalidationReceiver, InvalidationState state, DirtyKeyTracker dirtyKeyTracker, - Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory) { + Function<ExecutorParams, ? extends ExecutorService> executorFactory) { state.update(diff); return state.isEmpty() ? null : new DirtyingNodeVisitor(graph, invalidationReceiver, state, dirtyKeyTracker, executorFactory); } - @Nullable - static DirtyingNodeVisitor createInvalidatingVisitorIfNeeded( - DirtiableGraph graph, - Iterable<SkyKey> diff, - EvaluationProgressReceiver invalidationReceiver, - InvalidationState state, - DirtyKeyTracker dirtyKeyTracker) { - return createInvalidatingVisitorIfNeeded(graph, diff, invalidationReceiver, state, - dirtyKeyTracker, AbstractQueueVisitor.EXECUTOR_FACTORY); - } - /** * Invalidates given values and their upward transitive closure in the graph, using an executor * constructed with the provided factory, if necessary. @@ -101,7 +90,7 @@ public final class EagerInvalidator { EvaluationProgressReceiver invalidationReceiver, InvalidationState state, DirtyKeyTracker dirtyKeyTracker, - Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory) + Function<ExecutorParams, ? extends ExecutorService> executorFactory) throws InterruptedException { // If we are invalidating, we must be in an incremental build by definition, so we must // maintain a consistent graph state by traversing the graph and invalidating transitive diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java index 2b4fa85c71..459c7ffc7d 100644 --- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java +++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java @@ -21,13 +21,13 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; -import com.google.devtools.build.lib.concurrent.ThreadPoolExecutorParams; +import com.google.devtools.build.lib.concurrent.ExecutorParams; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.util.Pair; import java.util.Map; import java.util.Set; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -80,10 +80,9 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr @Nullable EvaluationProgressReceiver invalidationReceiver, InvalidationState state, DirtyKeyTracker dirtyKeyTracker, - Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory) { + Function<ExecutorParams, ? extends ExecutorService> executorFactory) { super(/*concurrent=*/true, - /*corePoolSize=*/DEFAULT_THREAD_COUNT, - /*maxPoolSize=*/DEFAULT_THREAD_COUNT, + /*parallelism=*/DEFAULT_THREAD_COUNT, /*keepAliveTime=*/1, /*units=*/TimeUnit.SECONDS, /*failFastOnException=*/true, @@ -298,7 +297,7 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr EvaluationProgressReceiver invalidationReceiver, InvalidationState state, DirtyKeyTracker dirtyKeyTracker, - Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory) { + Function<ExecutorParams, ? extends ExecutorService> executorFactory) { super(graph, invalidationReceiver, state, dirtyKeyTracker, executorFactory); } diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java index fc54c1f8fd..59ff33ad48 100644 --- a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java +++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java @@ -600,7 +600,6 @@ public final class ParallelEvaluator implements Evaluator { private ValueVisitor(int threadCount) { super(/*concurrent*/true, threadCount, - threadCount, 1, TimeUnit.SECONDS, /*failFastOnException*/true, /*failFastOnInterrupt*/true, diff --git a/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java b/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java index db004b2acc..d9fb1a3080 100644 --- a/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java +++ b/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java @@ -134,7 +134,7 @@ public class ConcurrentMultimapWithHeadElementTest { private final AtomicInteger actionCount = new AtomicInteger(0); private StressTester() { - super(/*concurrent=*/true, 200, 200, 1, TimeUnit.SECONDS, + super(/*concurrent=*/true, 200, 1, TimeUnit.SECONDS, /*failFastOnException=*/true, /*failFastOnInterrupt=*/true, "action-graph-test"); } diff --git a/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java b/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java index 82fd413eee..6d5a23e665 100644 --- a/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java +++ b/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java @@ -81,7 +81,7 @@ public class MapBasedActionGraphTest { private final AtomicInteger actionCount = new AtomicInteger(0); private ActionRegisterer() { - super(/*concurrent=*/true, 200, 200, 1, TimeUnit.SECONDS, + super(/*concurrent=*/true, 200, 1, TimeUnit.SECONDS, /*failFastOnException=*/true, /*failFastOnInterrupt=*/true, "action-graph-test"); FileSystem fileSystem = new InMemoryFileSystem(BlazeClock.instance()); Path path = fileSystem.getPath("/root/foo"); diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java index cec348d2a2..19b4274267 100644 --- a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java +++ b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java @@ -474,7 +474,7 @@ public class AbstractQueueVisitorTest { private final Object lock = new Object(); public CountingQueueVisitor() { - super(5, 5, 3L, TimeUnit.SECONDS, THREAD_NAME); + super(5, 3L, TimeUnit.SECONDS, THREAD_NAME); } public CountingQueueVisitor(ThreadPoolExecutor executor) { @@ -505,15 +505,15 @@ public class AbstractQueueVisitorTest { private final static String THREAD_NAME = "BlazeTest ConcreteQueueVisitor"; public ConcreteQueueVisitor() { - super(5, 5, 3L, TimeUnit.SECONDS, THREAD_NAME); + super(5, 3L, TimeUnit.SECONDS, THREAD_NAME); } public ConcreteQueueVisitor(boolean failFast) { - super(true, 5, 5, 3L, TimeUnit.SECONDS, failFast, THREAD_NAME); + super(true, 5, 3L, TimeUnit.SECONDS, failFast, THREAD_NAME); } public ConcreteQueueVisitor(boolean failFast, boolean failFastOnInterrupt) { - super(true, 5, 5, 3L, TimeUnit.SECONDS, failFast, failFastOnInterrupt, THREAD_NAME); + super(true, 5, 3L, TimeUnit.SECONDS, failFast, failFastOnInterrupt, THREAD_NAME); } public ConcreteQueueVisitor(ThreadPoolExecutor executor, boolean failFast, diff --git a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java index 87f34a25d6..be7391f668 100644 --- a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java +++ b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java @@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.testing.GcFinalization; +import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; import com.google.devtools.build.lib.events.Reporter; import com.google.devtools.build.lib.testutil.TestUtils; import com.google.devtools.build.lib.util.Pair; @@ -639,8 +640,15 @@ public class EagerInvalidatorTest { // Dirty the node, and ensure that the tracker is aware of it: Iterable<SkyKey> diff1 = ImmutableList.of(skyKey("a")); InvalidationState state1 = new DirtyingInvalidationState(); - Preconditions.checkNotNull(EagerInvalidator.createInvalidatingVisitorIfNeeded(graph, diff1, - receiver, state1, dirtyKeyTracker)).run(); + Preconditions.checkNotNull( + EagerInvalidator.createInvalidatingVisitorIfNeeded( + graph, + diff1, + receiver, + state1, + dirtyKeyTracker, + AbstractQueueVisitor.EXECUTOR_FACTORY)) + .run(); assertThat(dirtyKeyTracker.getDirtyKeys()).containsExactly(skyKey("a"), skyKey("ab")); // Delete the node, and ensure that the tracker is no longer tracking it: @@ -662,7 +670,12 @@ public class EagerInvalidatorTest { Iterable<SkyKey> diff = ImmutableList.copyOf(keys); DirtyingNodeVisitor dirtyingNodeVisitor = EagerInvalidator.createInvalidatingVisitorIfNeeded( - graph, diff, invalidationReceiver, state, dirtyKeyTracker); + graph, + diff, + invalidationReceiver, + state, + dirtyKeyTracker, + AbstractQueueVisitor.EXECUTOR_FACTORY); if (dirtyingNodeVisitor != null) { visitor.set(dirtyingNodeVisitor); dirtyingNodeVisitor.run(); |