diff options
9 files changed, 78 insertions, 210 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java index 98e9efba08..f82ead9512 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java @@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; -import javax.annotation.Nullable; /** A {@link QuiescingExecutor} implementation that wraps an {@link ExecutorService}. */ public class AbstractQueueVisitor implements QuiescingExecutor { @@ -86,22 +85,13 @@ public class AbstractQueueVisitor implements QuiescingExecutor { private volatile Throwable catastrophe; /** - * Enables concurrency. For debugging or testing, set this to {@code false} to avoid thread - * creation and concurrency. Any deviation in observed behaviour is a bug. - */ - private final boolean concurrent; - - /** * An object used in the manner of a {@link java.util.concurrent.locks.Condition} object, for the * condition {@code remainingTasks.get() == 0 || jobsMustBeStopped}. * TODO(bazel-team): Replace with an actual {@link java.util.concurrent.locks.Condition} object. */ private final Object zeroRemainingTasks = new Object(); - /** - * If {@link #concurrent} is {@code true}, then this is a counter of the number of {@link - * Runnable}s {@link #execute}-d that have not finished evaluation. - */ + /** The number of {@link Runnable}s {@link #execute}-d that have not finished evaluation. */ private final AtomicLong remainingTasks = new AtomicLong(0); /** @@ -116,8 +106,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { /** Map from thread to number of jobs executing in the thread. Used for interrupt handling. */ private final AtomicLongMap<Thread> jobs = AtomicLongMap.create(); - /** The {@link ExecutorService}. If !{@code concurrent}, this may be {@code null}. */ - @Nullable private final ExecutorService executorService; + private final ExecutorService executorService; /** * Flag used to record when the main thread (the thread which called {@link #awaitQuiescence}) @@ -145,77 +134,24 @@ public class AbstractQueueVisitor implements QuiescingExecutor { private static final Logger logger = Logger.getLogger(AbstractQueueVisitor.class.getName()); - /** - * Create the {@link AbstractQueueVisitor}. - * - * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false} for - * debugging. - * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code - * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code - * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. - * @param keepAliveTime the keep-alive time for the {@link ExecutorService}, if applicable. - * @param units the time units of keepAliveTime. - * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception. - * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code - * null}, default thread naming will be used. - */ - public AbstractQueueVisitor( - boolean concurrent, + private static ExecutorService createExecutorService( int parallelism, long keepAliveTime, TimeUnit units, - boolean failFastOnException, - String poolName) { - this( - concurrent, - parallelism, - keepAliveTime, - units, - failFastOnException, - poolName, - EXECUTOR_FACTORY, - ErrorClassifier.DEFAULT); - } - - /** - * Create the {@link AbstractQueueVisitor}. - * - * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false} for - * debugging. - * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code - * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code corePoolSize} and - * {@code maximumPoolSize} in {@link ThreadPoolExecutor}. - * @param keepAliveTime the keep-alive time for the {@link ExecutorService}, if applicable. - * @param units the time units of keepAliveTime. - * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception. - * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code - * null}, default thread naming will be used. - * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs. - */ - public AbstractQueueVisitor( - boolean concurrent, - int parallelism, - long keepAliveTime, - TimeUnit units, - boolean failFastOnException, String poolName, - ErrorClassifier errorClassifier) { - this( - concurrent, - parallelism, - keepAliveTime, - units, - failFastOnException, - poolName, - EXECUTOR_FACTORY, - errorClassifier); + Function<ExecutorParams, ? extends ExecutorService> executorFactory) { + return Preconditions.checkNotNull(executorFactory) + .apply( + new ExecutorParams( + parallelism, + keepAliveTime, + units, + Preconditions.checkNotNull(poolName), + new BlockingStack<Runnable>())); } - /** * Create the {@link AbstractQueueVisitor}. * - * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false} for - * debugging. * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code corePoolSize} and * {@code maximumPoolSize} in {@link ThreadPoolExecutor}. @@ -224,12 +160,10 @@ public class AbstractQueueVisitor implements QuiescingExecutor { * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception. * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code * null}, default thread naming will be used. - * @param executorFactory the factory for constructing the executor service if {@code concurrent} - * is {@code true}. + * @param executorFactory the factory for constructing the executor service. * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs. */ public AbstractQueueVisitor( - boolean concurrent, int parallelism, long keepAliveTime, TimeUnit units, @@ -237,71 +171,16 @@ public class AbstractQueueVisitor implements QuiescingExecutor { String poolName, Function<ExecutorParams, ? extends ExecutorService> executorFactory, ErrorClassifier errorClassifier) { - Preconditions.checkNotNull(poolName); - Preconditions.checkNotNull(executorFactory); - Preconditions.checkNotNull(errorClassifier); - this.concurrent = concurrent; - this.failFastOnException = failFastOnException; - this.ownExecutorService = true; - this.executorService = - concurrent - ? executorFactory.apply( - new ExecutorParams( - parallelism, keepAliveTime, units, poolName, new BlockingStack<Runnable>())) - : null; - this.errorClassifier = errorClassifier; - } - - /** - * Create the {@link AbstractQueueVisitor}. - * - * @param executorService The {@link ExecutorService} to use. - * @param shutdownOnCompletion If {@code true}, pass ownership of the {@link ExecutorService} to - * this class. The service will be shut down after a - * call to {@link #awaitQuiescence}. Callers must not shutdown the - * {@link ExecutorService} while queue visitors use it. - * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception. - */ - public AbstractQueueVisitor( - ExecutorService executorService, boolean shutdownOnCompletion, boolean failFastOnException) { this( - /*concurrent=*/ true, - executorService, - shutdownOnCompletion, + createExecutorService(parallelism, keepAliveTime, units, poolName, executorFactory), + true, failFastOnException, - ErrorClassifier.DEFAULT); - } - - /** - * Create the {@link AbstractQueueVisitor}. - * - * @param concurrent if {@code false}, run tasks inline instead of using the {@link - * ExecutorService}. - * @param executorService The {@link ExecutorService} to use. - * @param shutdownOnCompletion If {@code true}, pass ownership of the {@link ExecutorService} to - * this class. The service will be shut down after a - * call to {@link #awaitQuiescence}. Callers must not shut down the - * {@link ExecutorService} while queue visitors use it. - * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception. - */ - public AbstractQueueVisitor( - boolean concurrent, - ExecutorService executorService, - boolean shutdownOnCompletion, - boolean failFastOnException) { - Preconditions.checkArgument(executorService != null || !concurrent); - this.concurrent = concurrent; - this.failFastOnException = failFastOnException; - this.ownExecutorService = shutdownOnCompletion; - this.executorService = executorService; - this.errorClassifier = ErrorClassifier.DEFAULT; + errorClassifier); } /** * Create the AbstractQueueVisitor. * - * @param concurrent if {@code false}, run tasks inline instead of using the {@link - * ExecutorService}. * @param executorService The {@link ExecutorService} to use. * @param shutdownOnCompletion If {@code true}, pass ownership of the {@link ExecutorService} to * this class. The service will be shut down after a call to {@link #awaitQuiescence}. Callers @@ -309,56 +188,23 @@ public class AbstractQueueVisitor implements QuiescingExecutor { * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception. * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs. */ - public AbstractQueueVisitor( - boolean concurrent, + protected AbstractQueueVisitor( ExecutorService executorService, boolean shutdownOnCompletion, boolean failFastOnException, ErrorClassifier errorClassifier) { - Preconditions.checkArgument(executorService != null || !concurrent); - this.concurrent = concurrent; this.failFastOnException = failFastOnException; this.ownExecutorService = shutdownOnCompletion; - this.executorService = executorService; - this.errorClassifier = errorClassifier; + this.executorService = Preconditions.checkNotNull(executorService); + this.errorClassifier = Preconditions.checkNotNull(errorClassifier); } - /** - * Create the {@code AbstractQueueVisitor} with concurrency enabled. - * - * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code - * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code - * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. - * @param keepAlive the keep-alive time for the {@link ExecutorService}, if applicable. - * @param units the time units of keepAliveTime. - * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code - * null}, default thread naming will be used. - */ - public AbstractQueueVisitor(int parallelism, long keepAlive, TimeUnit units, String poolName) { - this( - true, - parallelism, - keepAlive, - units, - false, - poolName, - EXECUTOR_FACTORY, - ErrorClassifier.DEFAULT); - } - - @Override public final void awaitQuiescence(boolean interruptWorkers) throws InterruptedException { - if (concurrent) { - awaitTermination(interruptWorkers); - } else { - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException(); - } - } + awaitTermination(interruptWorkers); } - /** Schedules a call. Called in a worker thread if concurrent. */ + /** Schedules a call. Called in a worker thread. */ @Override public final void execute(Runnable runnable) { if (runConcurrently()) { @@ -391,7 +237,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { * asynchronously versus in-thread. */ protected boolean runConcurrently() { - return concurrent; + return true; } /** diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java index ca2b38da44..295e758628 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java @@ -28,7 +28,6 @@ public class ForkJoinQuiescingExecutor extends AbstractQueueVisitor { private ForkJoinQuiescingExecutor( ForkJoinPool forkJoinPool, ErrorClassifier errorClassifier, boolean shutdownOnCompletion) { super( - /*concurrent=*/ true, forkJoinPool, shutdownOnCompletion, /*failFastOnException=*/ true, diff --git a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java index fc51c9a9d6..d3b2c140f4 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java @@ -23,6 +23,7 @@ import com.google.common.collect.Multimaps; import com.google.common.collect.SetMultimap; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; +import com.google.devtools.build.lib.concurrent.ErrorClassifier; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.events.ExtendedEventHandler; import com.google.devtools.build.lib.packages.AggregatingAttributeMapper; @@ -240,8 +241,6 @@ final class LabelVisitor { private final Iterable<TargetEdgeObserver> observers; private final TargetEdgeErrorObserver errorObserver; private final AtomicBoolean stopNewActions = new AtomicBoolean(false); - private static final boolean CONCURRENT = true; - public Visitor( ExtendedEventHandler eventHandler, @@ -252,7 +251,14 @@ final class LabelVisitor { // Observing the loading phase of a typical large package (with all subpackages) shows // maximum thread-level concurrency of ~20. Limiting the total number of threads to 200 is // therefore conservative and should help us avoid hitting native limits. - super(CONCURRENT, parallelThreads, 1L, TimeUnit.SECONDS, !keepGoing, THREAD_NAME); + super( + parallelThreads, + 1L, + TimeUnit.SECONDS, + !keepGoing, + THREAD_NAME, + AbstractQueueVisitor.EXECUTOR_FACTORY, + ErrorClassifier.DEFAULT); this.eventHandler = eventHandler; this.maxDepth = maxDepth; this.errorObserver = new TargetEdgeErrorObserver(); diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java index 0052892130..07249b50fb 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java @@ -549,7 +549,6 @@ class ParallelSkyQueryUtils { private class BFSVisitingTaskExecutor extends AbstractQueueVisitor { private BFSVisitingTaskExecutor(ExecutorService executor, ErrorClassifier errorClassifier) { super( - /*concurrent=*/ true, /*executorService=*/ executor, // Leave the thread pool active for other current and future callers. /*shutdownOnCompletion=*/ false, diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java index d7bb2d2cab..30314f5fa7 100644 --- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java +++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java @@ -100,7 +100,6 @@ public abstract class InvalidatingNodeVisitor<TGraph extends QueryableGraph> { Function<ExecutorParams, ? extends ExecutorService> executorFactory) { this.executor = new AbstractQueueVisitor( - /*concurrent=*/ true, /*parallelism=*/ DEFAULT_THREAD_COUNT, /*keepAliveTime=*/ 1, /*units=*/ TimeUnit.SECONDS, diff --git a/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java index 722410e77b..d528a830cd 100644 --- a/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java +++ b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java @@ -78,12 +78,12 @@ class NodeEntryVisitor { Function<SkyKey, Runnable> runnableMaker) { quiescingExecutor = new AbstractQueueVisitor( - /*concurrent*/ true, threadCount, /*keepAliveTime=*/ 1, TimeUnit.SECONDS, /*failFastOnException*/ true, "skyframe-evaluator", + AbstractQueueVisitor.EXECUTOR_FACTORY, NODE_ENTRY_VISITOR_ERROR_CLASSIFIER); this.progressReceiver = progressReceiver; this.runnableMaker = runnableMaker; diff --git a/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java b/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java index 456b394a85..1ed396aae6 100644 --- a/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java +++ b/src/test/java/com/google/devtools/build/lib/actions/ConcurrentMultimapWithHeadElementTest.java @@ -20,17 +20,16 @@ import static org.junit.Assert.assertTrue; import com.google.common.testing.GcFinalization; import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; +import com.google.devtools.build.lib.concurrent.ErrorClassifier; import com.google.devtools.build.lib.testutil.TestThread; import com.google.devtools.build.lib.testutil.TestUtils; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - import java.lang.ref.WeakReference; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; /** * Tests for ConcurrentMultimapWithHeadElement. @@ -128,19 +127,20 @@ public class ConcurrentMultimapWithHeadElementTest { } } - private class StressTester extends AbstractQueueVisitor { + private static class StressTester extends AbstractQueueVisitor { private final ConcurrentMultimapWithHeadElement<Boolean, Integer> multimap = new ConcurrentMultimapWithHeadElement<>(); private final AtomicInteger actionCount = new AtomicInteger(0); private StressTester() { super( - /*concurrent=*/ true, 200, 1, TimeUnit.SECONDS, /*failFastOnException=*/ true, - "action-graph-test"); + "action-graph-test", + AbstractQueueVisitor.EXECUTOR_FACTORY, + ErrorClassifier.DEFAULT); } private void addAndRemove(final Boolean key, final Integer add, final Integer remove) { diff --git a/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java b/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java index a527549e6c..d83cf815d3 100644 --- a/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java +++ b/src/test/java/com/google/devtools/build/lib/actions/MapBasedActionGraphTest.java @@ -21,18 +21,17 @@ import com.google.devtools.build.lib.actions.MutableActionGraph.ActionConflictEx import com.google.devtools.build.lib.actions.util.ActionsTestUtil.UncheckedActionConflictException; import com.google.devtools.build.lib.actions.util.TestAction; import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; +import com.google.devtools.build.lib.concurrent.ErrorClassifier; import com.google.devtools.build.lib.util.BlazeClock; import com.google.devtools.build.lib.vfs.FileSystem; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; /** * Tests for {@link MapBasedActionGraph}. @@ -73,7 +72,7 @@ public class MapBasedActionGraphTest { actionGraph.unregisterAction(action); } - private class ActionRegisterer extends AbstractQueueVisitor { + private static class ActionRegisterer extends AbstractQueueVisitor { private final MutableActionGraph graph = new MapBasedActionGraph(); private final Artifact output; // Just to occasionally add actions that were already present. @@ -82,12 +81,13 @@ public class MapBasedActionGraphTest { private ActionRegisterer() { super( - /*concurrent=*/ true, 200, 1, TimeUnit.SECONDS, /*failFastOnException=*/ true, - "action-graph-test"); + "action-graph-test", + AbstractQueueVisitor.EXECUTOR_FACTORY, + ErrorClassifier.DEFAULT); FileSystem fileSystem = new InMemoryFileSystem(BlazeClock.instance()); Path path = fileSystem.getPath("/root/foo"); output = new Artifact(path, Root.asDerivedRoot(path)); diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java index 7be04c4547..5aa63dd94c 100644 --- a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java +++ b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java @@ -462,7 +462,6 @@ public class AbstractQueueVisitorTest { }; AbstractQueueVisitor visitor = new AbstractQueueVisitor( - /*concurrent=*/ true, executor, /*shutdownOnCompletion=*/ true, /*failFastOnException=*/ false, @@ -557,11 +556,18 @@ public class AbstractQueueVisitorTest { private final Object lock = new Object(); public CountingQueueVisitor() { - super(/*parallelism=*/ 5, /*keepAlive=*/ 3L, TimeUnit.SECONDS, THREAD_NAME); + super( + /*parallelism=*/ 5, + /*keepAlive=*/ 3L, + TimeUnit.SECONDS, + /*failFast=*/ false, + THREAD_NAME, + AbstractQueueVisitor.EXECUTOR_FACTORY, + ErrorClassifier.DEFAULT); } - public CountingQueueVisitor(ThreadPoolExecutor executor) { - super(executor, false, true); + CountingQueueVisitor(ThreadPoolExecutor executor) { + super(executor, false, true, ErrorClassifier.DEFAULT); } public void enqueue() { @@ -588,23 +594,36 @@ public class AbstractQueueVisitorTest { private final static String THREAD_NAME = "BlazeTest ConcreteQueueVisitor"; - public ConcreteQueueVisitor() { - super(5, 3L, TimeUnit.SECONDS, THREAD_NAME); + ConcreteQueueVisitor() { + super( + 5, + 3L, + TimeUnit.SECONDS, + /*failFast=*/ false, + THREAD_NAME, + AbstractQueueVisitor.EXECUTOR_FACTORY, + ErrorClassifier.DEFAULT); } - public ConcreteQueueVisitor(boolean failFast) { - super(true, 5, 3L, TimeUnit.SECONDS, failFast, THREAD_NAME); + ConcreteQueueVisitor(boolean failFast) { + super( + 5, + 3L, + TimeUnit.SECONDS, + failFast, + THREAD_NAME, + AbstractQueueVisitor.EXECUTOR_FACTORY, + ErrorClassifier.DEFAULT); } - public ConcreteQueueVisitor(ThreadPoolExecutor executor, boolean failFast) { - super(executor, /*shutdownOnCompletion=*/ false, failFast); + ConcreteQueueVisitor(ThreadPoolExecutor executor, boolean failFast) { + super(executor, /*shutdownOnCompletion=*/ false, failFast, ErrorClassifier.DEFAULT); } } private static AbstractQueueVisitor createQueueVisitorWithConstantErrorClassification( ThreadPoolExecutor executor, final ErrorClassification classification) { return new AbstractQueueVisitor( - /*concurrent=*/ true, executor, /*shutdownOnCompletion=*/ true, /*failFastOnException=*/ false, |