From e3c88deb76d1a24a6925be80ea49ab9058069d02 Mon Sep 17 00:00:00 2001 From: Mark Schaller Date: Wed, 5 Aug 2015 22:02:59 +0000 Subject: Add threadpool injectability to invalidator -- MOS_MIGRATED_REVID=99961435 --- .../build/lib/concurrent/AbstractQueueVisitor.java | 104 +++++++++++++++++++-- .../devtools/build/skyframe/EagerInvalidator.java | 69 ++++++++++---- .../build/skyframe/InvalidatingNodeVisitor.java | 29 ++++-- .../build/skyframe/EagerInvalidatorTest.java | 25 ++--- 4 files changed, 180 insertions(+), 47 deletions(-) (limited to 'src') diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java index a937d178dd..c7ba0f7508 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java @@ -15,6 +15,7 @@ package com.google.devtools.build.lib.concurrent; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Maps; @@ -52,6 +53,65 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class AbstractQueueVisitor { + /** + * Configuration parameters for {@link ThreadPoolExecutor} construction. + */ + public static class ThreadPoolExecutorParams { + private final int corePoolSize; + private final int maxPoolSize; + private final long keepAliveTime; + private final TimeUnit units; + private final String poolName; + private final BlockingQueue workQueue; + + public ThreadPoolExecutorParams(int corePoolSize, int maxPoolSize, long keepAliveTime, + TimeUnit units, String poolName, BlockingQueue workQueue) { + this.corePoolSize = corePoolSize; + this.maxPoolSize = maxPoolSize; + this.keepAliveTime = keepAliveTime; + this.units = units; + this.poolName = poolName; + this.workQueue = workQueue; + } + + public int getCorePoolSize() { + return corePoolSize; + } + + public int getMaxPoolSize() { + return maxPoolSize; + } + + public long getKeepAliveTime() { + return keepAliveTime; + } + + public TimeUnit getUnits() { + return units; + } + + public String getPoolName() { + return poolName; + } + + public BlockingQueue getWorkQueue() { + return workQueue; + } + } + + /** + * Default factory function for constructing {@link ThreadPoolExecutor}s. + */ + public static final Function EXECUTOR_FACTORY = + new Function() { + @Override + public ThreadPoolExecutor apply(ThreadPoolExecutorParams p) { + return new ThreadPoolExecutor(p.getCorePoolSize(), p.getMaxPoolSize(), + p.getKeepAliveTime(), p.getUnits(), p.getWorkQueue(), + new ThreadFactoryBuilder().setNameFormat(p.getPoolName() + " %d").build()); + } + }; + /** * The first unhandled exception thrown by a worker thread. We save it * and re-throw it from the main thread to detect bugs faster; @@ -136,7 +196,8 @@ public class AbstractQueueVisitor { * @param concurrent true if concurrency should be enabled. Only set to * false for debugging. * @param corePoolSize the core pool size of the thread pool. See - * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, java.util.concurrent.BlockingQueue)} + * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, + * BlockingQueue)} * @param maxPoolSize the max number of threads in the pool. * @param keepAliveTime the keep-alive time for the thread pool. * @param units the time units of keepAliveTime. @@ -149,15 +210,41 @@ public class AbstractQueueVisitor { public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit units, boolean failFastOnException, boolean failFastOnInterrupt, String poolName) { - Preconditions.checkNotNull(poolName); + this(concurrent, corePoolSize, maxPoolSize, keepAliveTime, units, failFastOnException, + failFastOnInterrupt, poolName, EXECUTOR_FACTORY); + } + /** + * Create the AbstractQueueVisitor. + * + * @param concurrent true if concurrency should be enabled. Only set to + * false for debugging. + * @param corePoolSize the core pool size of the thread pool. See + * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, + * BlockingQueue)} + * @param maxPoolSize the max number of threads in the pool. + * @param keepAliveTime the keep-alive time for the thread pool. + * @param units the time units of keepAliveTime. + * @param failFastOnException if true, don't run new actions after an uncaught exception. + * @param failFastOnInterrupt if true, don't run new actions after interrupt. + * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default + * thread naming will be used. + * @param executorFactory the factory for constructing the thread pool if {@code concurrent} is + * true. + */ + public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize, + long keepAliveTime, TimeUnit units, boolean failFastOnException, + boolean failFastOnInterrupt, String poolName, + Function executorFactory) { + Preconditions.checkNotNull(poolName); + Preconditions.checkNotNull(executorFactory); this.concurrent = concurrent; this.failFastOnException = failFastOnException; this.failFastOnInterrupt = failFastOnInterrupt; this.ownThreadPool = true; this.pool = concurrent - ? new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, units, getWorkQueue(), - new ThreadFactoryBuilder().setNameFormat(poolName + " %d").build()) + ? executorFactory.apply(new ThreadPoolExecutorParams(corePoolSize, maxPoolSize, + keepAliveTime, units, poolName, getWorkQueue())) : null; } @@ -167,7 +254,8 @@ public class AbstractQueueVisitor { * @param concurrent true if concurrency should be enabled. Only set to * false for debugging. * @param corePoolSize the core pool size of the thread pool. See - * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, java.util.concurrent.BlockingQueue)} + * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, + * BlockingQueue)} * @param maxPoolSize the max number of threads in the pool. * @param keepAliveTime the keep-alive time for the thread pool. * @param units the time units of keepAliveTime. @@ -233,7 +321,8 @@ public class AbstractQueueVisitor { * @param concurrent true if concurrency should be enabled. Only set to * false for debugging. * @param corePoolSize the core pool size of the thread pool. See - * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, java.util.concurrent.BlockingQueue)} + * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, + * BlockingQueue)} * @param maxPoolSize the max number of threads in the pool. * @param keepAliveTime the keep-alive time for the thread pool. * @param units the time units of keepAliveTime. @@ -249,7 +338,8 @@ public class AbstractQueueVisitor { * Create the AbstractQueueVisitor with concurrency enabled. * * @param corePoolSize the core pool size of the thread pool. See - * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, java.util.concurrent.BlockingQueue)} + * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, + * BlockingQueue)} * @param maxPoolSize the max number of threads in the pool. * @param keepAlive the keep-alive time for the thread pool. * @param units the time units of keepAliveTime. diff --git a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java index fc2a2c7abc..cfe03e2fc6 100644 --- a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java +++ b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java @@ -13,11 +13,17 @@ // limitations under the License. package com.google.devtools.build.skyframe; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; +import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor.ThreadPoolExecutorParams; import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DeletingNodeVisitor; import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DirtyingNodeVisitor; import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.InvalidationState; +import java.util.concurrent.ThreadPoolExecutor; + +import javax.annotation.Nullable; + /** * Utility class for performing eager invalidation on Skyframe graphs. * @@ -39,47 +45,72 @@ public final class EagerInvalidator { EvaluationProgressReceiver invalidationReceiver, InvalidationState state, boolean traverseGraph, DirtyKeyTracker dirtyKeyTracker) throws InterruptedException { InvalidatingNodeVisitor visitor = - createVisitor(/*delete=*/true, graph, diff, invalidationReceiver, state, traverseGraph, + createDeletingVisitorIfNeeded(graph, diff, invalidationReceiver, state, traverseGraph, dirtyKeyTracker); if (visitor != null) { visitor.run(); } } - /** - * Creates an invalidation visitor that is ready to run. Caller should call #run() on the visitor. - * Allows test classes to keep a reference to the visitor, and await exceptions/interrupts. - */ - @VisibleForTesting - static InvalidatingNodeVisitor createVisitor(boolean delete, DirtiableGraph graph, + @Nullable + static InvalidatingNodeVisitor createDeletingVisitorIfNeeded(DirtiableGraph graph, Iterable diff, EvaluationProgressReceiver invalidationReceiver, InvalidationState state, boolean traverseGraph, DirtyKeyTracker dirtyKeyTracker) { state.update(diff); - if (state.isEmpty()) { - return null; - } - return delete - ? new DeletingNodeVisitor(graph, invalidationReceiver, state, traverseGraph, - dirtyKeyTracker) - : new DirtyingNodeVisitor(graph, invalidationReceiver, state, dirtyKeyTracker); + return state.isEmpty() ? null + : new DeletingNodeVisitor(graph, invalidationReceiver, state, traverseGraph, + dirtyKeyTracker); + } + + @Nullable + static InvalidatingNodeVisitor createInvalidatingVisitorIfNeeded(DirtiableGraph graph, + Iterable diff, EvaluationProgressReceiver invalidationReceiver, + InvalidationState state, DirtyKeyTracker dirtyKeyTracker, + Function executorFactory) { + state.update(diff); + return state.isEmpty() ? null + : new DirtyingNodeVisitor(graph, invalidationReceiver, state, dirtyKeyTracker, + executorFactory); + } + + @Nullable + static InvalidatingNodeVisitor createInvalidatingVisitorIfNeeded(DirtiableGraph graph, + Iterable diff, EvaluationProgressReceiver invalidationReceiver, + InvalidationState state, DirtyKeyTracker dirtyKeyTracker) { + return createInvalidatingVisitorIfNeeded(graph, diff, invalidationReceiver, state, + dirtyKeyTracker, AbstractQueueVisitor.EXECUTOR_FACTORY); } /** - * Invalidates given values and their upward transitive closure in the graph. + * Invalidates given values and their upward transitive closure in the graph, using an executor + * constructed with the provided factory, if necessary. */ public static void invalidate(DirtiableGraph graph, Iterable diff, EvaluationProgressReceiver invalidationReceiver, InvalidationState state, - DirtyKeyTracker dirtyKeyTracker) + DirtyKeyTracker dirtyKeyTracker, + Function executorFactory) throws InterruptedException { // If we are invalidating, we must be in an incremental build by definition, so we must // maintain a consistent graph state by traversing the graph and invalidating transitive // dependencies. If edges aren't present, it would be impossible to check the dependencies of // a dirty node in any case. InvalidatingNodeVisitor visitor = - createVisitor(/*delete=*/false, graph, diff, invalidationReceiver, state, - /*traverseGraph=*/true, dirtyKeyTracker); + createInvalidatingVisitorIfNeeded(graph, diff, invalidationReceiver, state, + dirtyKeyTracker, executorFactory); if (visitor != null) { visitor.run(); } } + + /** + * Invalidates given values and their upward transitive closure in the graph. + */ + public static void invalidate(DirtiableGraph graph, Iterable diff, + EvaluationProgressReceiver invalidationReceiver, InvalidationState state, + DirtyKeyTracker dirtyKeyTracker) + throws InterruptedException { + invalidate(graph, diff, invalidationReceiver, state, dirtyKeyTracker, + AbstractQueueVisitor.EXECUTOR_FACTORY); + } + } diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java index 73044a706d..6ea61d68b0 100644 --- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java +++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java @@ -27,6 +27,7 @@ import com.google.devtools.build.lib.util.Pair; import java.util.Map; import java.util.Set; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; @@ -71,13 +72,22 @@ public abstract class InvalidatingNodeVisitor extends AbstractQueueVisitor { protected InvalidatingNodeVisitor( DirtiableGraph graph, @Nullable EvaluationProgressReceiver invalidationReceiver, InvalidationState state, DirtyKeyTracker dirtyKeyTracker) { - super(/*concurrent*/true, - /*corePoolSize*/DEFAULT_THREAD_COUNT, - /*maxPoolSize*/DEFAULT_THREAD_COUNT, - 1, TimeUnit.SECONDS, - /*failFastOnException*/true, - /*failFastOnInterrupt*/true, - "skyframe-invalidator"); + this(graph, invalidationReceiver, state, dirtyKeyTracker, EXECUTOR_FACTORY); + } + + protected InvalidatingNodeVisitor( + DirtiableGraph graph, @Nullable EvaluationProgressReceiver invalidationReceiver, + InvalidationState state, DirtyKeyTracker dirtyKeyTracker, + Function executorFactory) { + super(/*concurrent=*/true, + /*corePoolSize=*/DEFAULT_THREAD_COUNT, + /*maxPoolSize=*/DEFAULT_THREAD_COUNT, + /*keepAliveTime=*/1, + /*units=*/TimeUnit.SECONDS, + /*failFastOnException=*/true, + /*failFastOnInterrupt=*/true, + "skyframe-invalidator", + executorFactory); this.graph = Preconditions.checkNotNull(graph); this.invalidationReceiver = invalidationReceiver; this.dirtyKeyTracker = Preconditions.checkNotNull(dirtyKeyTracker); @@ -274,8 +284,9 @@ public abstract class InvalidatingNodeVisitor extends AbstractQueueVisitor { protected DirtyingNodeVisitor(DirtiableGraph graph, EvaluationProgressReceiver invalidationReceiver, InvalidationState state, - DirtyKeyTracker dirtyKeyTracker) { - super(graph, invalidationReceiver, state, dirtyKeyTracker); + DirtyKeyTracker dirtyKeyTracker, + Function executorFactory) { + super(graph, invalidationReceiver, state, dirtyKeyTracker, executorFactory); } @Override diff --git a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java index 85646375ed..31bf8aabe5 100644 --- a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java +++ b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java @@ -517,9 +517,10 @@ public class EagerInvalidatorTest { @Override protected void invalidate(DirtiableGraph graph, EvaluationProgressReceiver invalidationReceiver, SkyKey... keys) throws InterruptedException { + Iterable diff = ImmutableList.copyOf(keys); InvalidatingNodeVisitor invalidatingVisitor = - EagerInvalidator.createVisitor(/*delete=*/true, graph, ImmutableList.copyOf(keys), - invalidationReceiver, state, true, dirtyKeyTracker); + EagerInvalidator.createDeletingVisitorIfNeeded(graph, diff, invalidationReceiver, state, + true, dirtyKeyTracker); if (invalidatingVisitor != null) { visitor.set(invalidatingVisitor); invalidatingVisitor.run(); @@ -552,17 +553,16 @@ public class EagerInvalidatorTest { TrackingInvalidationReceiver receiver = new TrackingInvalidationReceiver(); // Dirty the node, and ensure that the tracker is aware of it: - InvalidatingNodeVisitor dirtyingVisitor = - EagerInvalidator.createVisitor(/*delete=*/false, graph, ImmutableList.of(skyKey("a")), - receiver, new DirtyingInvalidationState(), true, dirtyKeyTracker); - dirtyingVisitor.run(); + Iterable diff1 = ImmutableList.of(skyKey("a")); + InvalidationState state1 = new DirtyingInvalidationState(); + Preconditions.checkNotNull(EagerInvalidator.createInvalidatingVisitorIfNeeded(graph, diff1, + receiver, state1, dirtyKeyTracker)).run(); assertThat(dirtyKeyTracker.getDirtyKeys()).containsExactly(skyKey("a"), skyKey("ab")); // Delete the node, and ensure that the tracker is no longer tracking it: - InvalidatingNodeVisitor deletingVisitor = - EagerInvalidator.createVisitor(/*delete=*/true, graph, ImmutableList.of(skyKey("a")), - receiver, state, true, dirtyKeyTracker); - deletingVisitor.run(); + Iterable diff = ImmutableList.of(skyKey("a")); + Preconditions.checkNotNull(EagerInvalidator.createDeletingVisitorIfNeeded(graph, diff, + receiver, state, true, dirtyKeyTracker)).run(); assertThat(dirtyKeyTracker.getDirtyKeys()).containsExactly(skyKey("ab")); } } @@ -575,9 +575,10 @@ public class EagerInvalidatorTest { @Override protected void invalidate(DirtiableGraph graph, EvaluationProgressReceiver invalidationReceiver, SkyKey... keys) throws InterruptedException { + Iterable diff = ImmutableList.copyOf(keys); InvalidatingNodeVisitor invalidatingVisitor = - EagerInvalidator.createVisitor(/*delete=*/false, graph, ImmutableList.copyOf(keys), - invalidationReceiver, state, true, dirtyKeyTracker); + EagerInvalidator.createInvalidatingVisitorIfNeeded(graph, diff, invalidationReceiver, + state, dirtyKeyTracker); if (invalidatingVisitor != null) { visitor.set(invalidatingVisitor); invalidatingVisitor.run(); -- cgit v1.2.3