aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Mark Schaller <mschaller@google.com>2015-08-05 22:02:59 +0000
committerGravatar David Chen <dzc@google.com>2015-08-06 22:11:23 +0000
commite3c88deb76d1a24a6925be80ea49ab9058069d02 (patch)
treec65e090c513aad41c6ab0bc3dcf2266937111bba /src
parent96f9ea0e0c7a7ed2ab6f48aed2e701b33a24cada (diff)
Add threadpool injectability to invalidator
-- MOS_MIGRATED_REVID=99961435
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java104
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java69
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java29
-rw-r--r--src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java25
4 files changed, 180 insertions, 47 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index a937d178dd..c7ba0f7508 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -15,6 +15,7 @@
package com.google.devtools.build.lib.concurrent;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
@@ -53,6 +54,65 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class AbstractQueueVisitor {
/**
+ * Configuration parameters for {@link ThreadPoolExecutor} construction.
+ */
+ public static class ThreadPoolExecutorParams {
+ private final int corePoolSize;
+ private final int maxPoolSize;
+ private final long keepAliveTime;
+ private final TimeUnit units;
+ private final String poolName;
+ private final BlockingQueue<Runnable> workQueue;
+
+ public ThreadPoolExecutorParams(int corePoolSize, int maxPoolSize, long keepAliveTime,
+ TimeUnit units, String poolName, BlockingQueue<Runnable> workQueue) {
+ this.corePoolSize = corePoolSize;
+ this.maxPoolSize = maxPoolSize;
+ this.keepAliveTime = keepAliveTime;
+ this.units = units;
+ this.poolName = poolName;
+ this.workQueue = workQueue;
+ }
+
+ public int getCorePoolSize() {
+ return corePoolSize;
+ }
+
+ public int getMaxPoolSize() {
+ return maxPoolSize;
+ }
+
+ public long getKeepAliveTime() {
+ return keepAliveTime;
+ }
+
+ public TimeUnit getUnits() {
+ return units;
+ }
+
+ public String getPoolName() {
+ return poolName;
+ }
+
+ public BlockingQueue<Runnable> getWorkQueue() {
+ return workQueue;
+ }
+ }
+
+ /**
+ * Default factory function for constructing {@link ThreadPoolExecutor}s.
+ */
+ public static final Function<ThreadPoolExecutorParams, ThreadPoolExecutor> EXECUTOR_FACTORY =
+ new Function<ThreadPoolExecutorParams, ThreadPoolExecutor>() {
+ @Override
+ public ThreadPoolExecutor apply(ThreadPoolExecutorParams p) {
+ return new ThreadPoolExecutor(p.getCorePoolSize(), p.getMaxPoolSize(),
+ p.getKeepAliveTime(), p.getUnits(), p.getWorkQueue(),
+ new ThreadFactoryBuilder().setNameFormat(p.getPoolName() + " %d").build());
+ }
+ };
+
+ /**
* The first unhandled exception thrown by a worker thread. We save it
* and re-throw it from the main thread to detect bugs faster;
* otherwise worker threads just quietly die.
@@ -136,7 +196,8 @@ public class AbstractQueueVisitor {
* @param concurrent true if concurrency should be enabled. Only set to
* false for debugging.
* @param corePoolSize the core pool size of the thread pool. See
- * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, java.util.concurrent.BlockingQueue)}
+ * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit,
+ * BlockingQueue)}
* @param maxPoolSize the max number of threads in the pool.
* @param keepAliveTime the keep-alive time for the thread pool.
* @param units the time units of keepAliveTime.
@@ -149,15 +210,41 @@ public class AbstractQueueVisitor {
public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize,
long keepAliveTime, TimeUnit units, boolean failFastOnException,
boolean failFastOnInterrupt, String poolName) {
- Preconditions.checkNotNull(poolName);
+ this(concurrent, corePoolSize, maxPoolSize, keepAliveTime, units, failFastOnException,
+ failFastOnInterrupt, poolName, EXECUTOR_FACTORY);
+ }
+ /**
+ * Create the AbstractQueueVisitor.
+ *
+ * @param concurrent true if concurrency should be enabled. Only set to
+ * false for debugging.
+ * @param corePoolSize the core pool size of the thread pool. See
+ * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit,
+ * BlockingQueue)}
+ * @param maxPoolSize the max number of threads in the pool.
+ * @param keepAliveTime the keep-alive time for the thread pool.
+ * @param units the time units of keepAliveTime.
+ * @param failFastOnException if true, don't run new actions after an uncaught exception.
+ * @param failFastOnInterrupt if true, don't run new actions after interrupt.
+ * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default
+ * thread naming will be used.
+ * @param executorFactory the factory for constructing the thread pool if {@code concurrent} is
+ * true.
+ */
+ public AbstractQueueVisitor(boolean concurrent, int corePoolSize, int maxPoolSize,
+ long keepAliveTime, TimeUnit units, boolean failFastOnException,
+ boolean failFastOnInterrupt, String poolName,
+ Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory) {
+ Preconditions.checkNotNull(poolName);
+ Preconditions.checkNotNull(executorFactory);
this.concurrent = concurrent;
this.failFastOnException = failFastOnException;
this.failFastOnInterrupt = failFastOnInterrupt;
this.ownThreadPool = true;
this.pool = concurrent
- ? new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, units, getWorkQueue(),
- new ThreadFactoryBuilder().setNameFormat(poolName + " %d").build())
+ ? executorFactory.apply(new ThreadPoolExecutorParams(corePoolSize, maxPoolSize,
+ keepAliveTime, units, poolName, getWorkQueue()))
: null;
}
@@ -167,7 +254,8 @@ public class AbstractQueueVisitor {
* @param concurrent true if concurrency should be enabled. Only set to
* false for debugging.
* @param corePoolSize the core pool size of the thread pool. See
- * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, java.util.concurrent.BlockingQueue)}
+ * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit,
+ * BlockingQueue)}
* @param maxPoolSize the max number of threads in the pool.
* @param keepAliveTime the keep-alive time for the thread pool.
* @param units the time units of keepAliveTime.
@@ -233,7 +321,8 @@ public class AbstractQueueVisitor {
* @param concurrent true if concurrency should be enabled. Only set to
* false for debugging.
* @param corePoolSize the core pool size of the thread pool. See
- * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, java.util.concurrent.BlockingQueue)}
+ * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit,
+ * BlockingQueue)}
* @param maxPoolSize the max number of threads in the pool.
* @param keepAliveTime the keep-alive time for the thread pool.
* @param units the time units of keepAliveTime.
@@ -249,7 +338,8 @@ public class AbstractQueueVisitor {
* Create the AbstractQueueVisitor with concurrency enabled.
*
* @param corePoolSize the core pool size of the thread pool. See
- * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit, java.util.concurrent.BlockingQueue)}
+ * {@link ThreadPoolExecutor#ThreadPoolExecutor(int, int, long, TimeUnit,
+ * BlockingQueue)}
* @param maxPoolSize the max number of threads in the pool.
* @param keepAlive the keep-alive time for the thread pool.
* @param units the time units of keepAliveTime.
diff --git a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java
index fc2a2c7abc..cfe03e2fc6 100644
--- a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java
@@ -13,11 +13,17 @@
// limitations under the License.
package com.google.devtools.build.skyframe;
-import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
+import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor.ThreadPoolExecutorParams;
import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DeletingNodeVisitor;
import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DirtyingNodeVisitor;
import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.InvalidationState;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import javax.annotation.Nullable;
+
/**
* Utility class for performing eager invalidation on Skyframe graphs.
*
@@ -39,47 +45,72 @@ public final class EagerInvalidator {
EvaluationProgressReceiver invalidationReceiver, InvalidationState state,
boolean traverseGraph, DirtyKeyTracker dirtyKeyTracker) throws InterruptedException {
InvalidatingNodeVisitor visitor =
- createVisitor(/*delete=*/true, graph, diff, invalidationReceiver, state, traverseGraph,
+ createDeletingVisitorIfNeeded(graph, diff, invalidationReceiver, state, traverseGraph,
dirtyKeyTracker);
if (visitor != null) {
visitor.run();
}
}
- /**
- * Creates an invalidation visitor that is ready to run. Caller should call #run() on the visitor.
- * Allows test classes to keep a reference to the visitor, and await exceptions/interrupts.
- */
- @VisibleForTesting
- static InvalidatingNodeVisitor createVisitor(boolean delete, DirtiableGraph graph,
+ @Nullable
+ static InvalidatingNodeVisitor createDeletingVisitorIfNeeded(DirtiableGraph graph,
Iterable<SkyKey> diff, EvaluationProgressReceiver invalidationReceiver,
InvalidationState state, boolean traverseGraph, DirtyKeyTracker dirtyKeyTracker) {
state.update(diff);
- if (state.isEmpty()) {
- return null;
- }
- return delete
- ? new DeletingNodeVisitor(graph, invalidationReceiver, state, traverseGraph,
- dirtyKeyTracker)
- : new DirtyingNodeVisitor(graph, invalidationReceiver, state, dirtyKeyTracker);
+ return state.isEmpty() ? null
+ : new DeletingNodeVisitor(graph, invalidationReceiver, state, traverseGraph,
+ dirtyKeyTracker);
+ }
+
+ @Nullable
+ static InvalidatingNodeVisitor createInvalidatingVisitorIfNeeded(DirtiableGraph graph,
+ Iterable<SkyKey> diff, EvaluationProgressReceiver invalidationReceiver,
+ InvalidationState state, DirtyKeyTracker dirtyKeyTracker,
+ Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory) {
+ state.update(diff);
+ return state.isEmpty() ? null
+ : new DirtyingNodeVisitor(graph, invalidationReceiver, state, dirtyKeyTracker,
+ executorFactory);
+ }
+
+ @Nullable
+ static InvalidatingNodeVisitor createInvalidatingVisitorIfNeeded(DirtiableGraph graph,
+ Iterable<SkyKey> diff, EvaluationProgressReceiver invalidationReceiver,
+ InvalidationState state, DirtyKeyTracker dirtyKeyTracker) {
+ return createInvalidatingVisitorIfNeeded(graph, diff, invalidationReceiver, state,
+ dirtyKeyTracker, AbstractQueueVisitor.EXECUTOR_FACTORY);
}
/**
- * Invalidates given values and their upward transitive closure in the graph.
+ * Invalidates given values and their upward transitive closure in the graph, using an executor
+ * constructed with the provided factory, if necessary.
*/
public static void invalidate(DirtiableGraph graph, Iterable<SkyKey> diff,
EvaluationProgressReceiver invalidationReceiver, InvalidationState state,
- DirtyKeyTracker dirtyKeyTracker)
+ DirtyKeyTracker dirtyKeyTracker,
+ Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory)
throws InterruptedException {
// If we are invalidating, we must be in an incremental build by definition, so we must
// maintain a consistent graph state by traversing the graph and invalidating transitive
// dependencies. If edges aren't present, it would be impossible to check the dependencies of
// a dirty node in any case.
InvalidatingNodeVisitor visitor =
- createVisitor(/*delete=*/false, graph, diff, invalidationReceiver, state,
- /*traverseGraph=*/true, dirtyKeyTracker);
+ createInvalidatingVisitorIfNeeded(graph, diff, invalidationReceiver, state,
+ dirtyKeyTracker, executorFactory);
if (visitor != null) {
visitor.run();
}
}
+
+ /**
+ * Invalidates given values and their upward transitive closure in the graph.
+ */
+ public static void invalidate(DirtiableGraph graph, Iterable<SkyKey> diff,
+ EvaluationProgressReceiver invalidationReceiver, InvalidationState state,
+ DirtyKeyTracker dirtyKeyTracker)
+ throws InterruptedException {
+ invalidate(graph, diff, invalidationReceiver, state, dirtyKeyTracker,
+ AbstractQueueVisitor.EXECUTOR_FACTORY);
+ }
+
}
diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
index 73044a706d..6ea61d68b0 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
@@ -27,6 +27,7 @@ import com.google.devtools.build.lib.util.Pair;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
@@ -71,13 +72,22 @@ public abstract class InvalidatingNodeVisitor extends AbstractQueueVisitor {
protected InvalidatingNodeVisitor(
DirtiableGraph graph, @Nullable EvaluationProgressReceiver invalidationReceiver,
InvalidationState state, DirtyKeyTracker dirtyKeyTracker) {
- super(/*concurrent*/true,
- /*corePoolSize*/DEFAULT_THREAD_COUNT,
- /*maxPoolSize*/DEFAULT_THREAD_COUNT,
- 1, TimeUnit.SECONDS,
- /*failFastOnException*/true,
- /*failFastOnInterrupt*/true,
- "skyframe-invalidator");
+ this(graph, invalidationReceiver, state, dirtyKeyTracker, EXECUTOR_FACTORY);
+ }
+
+ protected InvalidatingNodeVisitor(
+ DirtiableGraph graph, @Nullable EvaluationProgressReceiver invalidationReceiver,
+ InvalidationState state, DirtyKeyTracker dirtyKeyTracker,
+ Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory) {
+ super(/*concurrent=*/true,
+ /*corePoolSize=*/DEFAULT_THREAD_COUNT,
+ /*maxPoolSize=*/DEFAULT_THREAD_COUNT,
+ /*keepAliveTime=*/1,
+ /*units=*/TimeUnit.SECONDS,
+ /*failFastOnException=*/true,
+ /*failFastOnInterrupt=*/true,
+ "skyframe-invalidator",
+ executorFactory);
this.graph = Preconditions.checkNotNull(graph);
this.invalidationReceiver = invalidationReceiver;
this.dirtyKeyTracker = Preconditions.checkNotNull(dirtyKeyTracker);
@@ -274,8 +284,9 @@ public abstract class InvalidatingNodeVisitor extends AbstractQueueVisitor {
protected DirtyingNodeVisitor(DirtiableGraph graph,
EvaluationProgressReceiver invalidationReceiver, InvalidationState state,
- DirtyKeyTracker dirtyKeyTracker) {
- super(graph, invalidationReceiver, state, dirtyKeyTracker);
+ DirtyKeyTracker dirtyKeyTracker,
+ Function<ThreadPoolExecutorParams, ThreadPoolExecutor> executorFactory) {
+ super(graph, invalidationReceiver, state, dirtyKeyTracker, executorFactory);
}
@Override
diff --git a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
index 85646375ed..31bf8aabe5 100644
--- a/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
+++ b/src/test/java/com/google/devtools/build/skyframe/EagerInvalidatorTest.java
@@ -517,9 +517,10 @@ public class EagerInvalidatorTest {
@Override
protected void invalidate(DirtiableGraph graph, EvaluationProgressReceiver invalidationReceiver,
SkyKey... keys) throws InterruptedException {
+ Iterable<SkyKey> diff = ImmutableList.copyOf(keys);
InvalidatingNodeVisitor invalidatingVisitor =
- EagerInvalidator.createVisitor(/*delete=*/true, graph, ImmutableList.copyOf(keys),
- invalidationReceiver, state, true, dirtyKeyTracker);
+ EagerInvalidator.createDeletingVisitorIfNeeded(graph, diff, invalidationReceiver, state,
+ true, dirtyKeyTracker);
if (invalidatingVisitor != null) {
visitor.set(invalidatingVisitor);
invalidatingVisitor.run();
@@ -552,17 +553,16 @@ public class EagerInvalidatorTest {
TrackingInvalidationReceiver receiver = new TrackingInvalidationReceiver();
// Dirty the node, and ensure that the tracker is aware of it:
- InvalidatingNodeVisitor dirtyingVisitor =
- EagerInvalidator.createVisitor(/*delete=*/false, graph, ImmutableList.of(skyKey("a")),
- receiver, new DirtyingInvalidationState(), true, dirtyKeyTracker);
- dirtyingVisitor.run();
+ Iterable<SkyKey> diff1 = ImmutableList.of(skyKey("a"));
+ InvalidationState state1 = new DirtyingInvalidationState();
+ Preconditions.checkNotNull(EagerInvalidator.createInvalidatingVisitorIfNeeded(graph, diff1,
+ receiver, state1, dirtyKeyTracker)).run();
assertThat(dirtyKeyTracker.getDirtyKeys()).containsExactly(skyKey("a"), skyKey("ab"));
// Delete the node, and ensure that the tracker is no longer tracking it:
- InvalidatingNodeVisitor deletingVisitor =
- EagerInvalidator.createVisitor(/*delete=*/true, graph, ImmutableList.of(skyKey("a")),
- receiver, state, true, dirtyKeyTracker);
- deletingVisitor.run();
+ Iterable<SkyKey> diff = ImmutableList.of(skyKey("a"));
+ Preconditions.checkNotNull(EagerInvalidator.createDeletingVisitorIfNeeded(graph, diff,
+ receiver, state, true, dirtyKeyTracker)).run();
assertThat(dirtyKeyTracker.getDirtyKeys()).containsExactly(skyKey("ab"));
}
}
@@ -575,9 +575,10 @@ public class EagerInvalidatorTest {
@Override
protected void invalidate(DirtiableGraph graph, EvaluationProgressReceiver invalidationReceiver,
SkyKey... keys) throws InterruptedException {
+ Iterable<SkyKey> diff = ImmutableList.copyOf(keys);
InvalidatingNodeVisitor invalidatingVisitor =
- EagerInvalidator.createVisitor(/*delete=*/false, graph, ImmutableList.copyOf(keys),
- invalidationReceiver, state, true, dirtyKeyTracker);
+ EagerInvalidator.createInvalidatingVisitorIfNeeded(graph, diff, invalidationReceiver,
+ state, dirtyKeyTracker);
if (invalidatingVisitor != null) {
visitor.set(invalidatingVisitor);
invalidatingVisitor.run();