aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main
diff options
context:
space:
mode:
authorGravatar janakr <janakr@google.com>2017-05-08 14:49:00 -0400
committerGravatar Kristina Chodorow <kchodorow@google.com>2017-05-09 10:52:36 -0400
commit1f75476f7a2f1abb449d538ef865f51ac138d013 (patch)
tree7335b500ba5bb34b5f97fb4cd30917376a38810e /src/main
parentcc0f6a62d5262760d34937547470a213f6bd5b72 (diff)
Clean up AbstractQueueVisitor's constructors.
The "concurrent" bit was supposedly around for testing purposes, but who knows if it even works anymore. Making other callsites explicitly state their ErrorClassifier gets us down to two constructors, one of which can delegate to the other. I think having both these constructors is useful because there's a linkage between creating a new executor service and specifying that the AQV should shut down the service at the end of the visitation. And using a static create() method doesn't work because of AQV's inheritance model. PiperOrigin-RevId: 155406771
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java198
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java1
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java12
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java1
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java1
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java2
6 files changed, 32 insertions, 183 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index 98e9efba08..f82ead9512 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
-import javax.annotation.Nullable;
/** A {@link QuiescingExecutor} implementation that wraps an {@link ExecutorService}. */
public class AbstractQueueVisitor implements QuiescingExecutor {
@@ -86,22 +85,13 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
private volatile Throwable catastrophe;
/**
- * Enables concurrency. For debugging or testing, set this to {@code false} to avoid thread
- * creation and concurrency. Any deviation in observed behaviour is a bug.
- */
- private final boolean concurrent;
-
- /**
* An object used in the manner of a {@link java.util.concurrent.locks.Condition} object, for the
* condition {@code remainingTasks.get() == 0 || jobsMustBeStopped}.
* TODO(bazel-team): Replace with an actual {@link java.util.concurrent.locks.Condition} object.
*/
private final Object zeroRemainingTasks = new Object();
- /**
- * If {@link #concurrent} is {@code true}, then this is a counter of the number of {@link
- * Runnable}s {@link #execute}-d that have not finished evaluation.
- */
+ /** The number of {@link Runnable}s {@link #execute}-d that have not finished evaluation. */
private final AtomicLong remainingTasks = new AtomicLong(0);
/**
@@ -116,8 +106,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
/** Map from thread to number of jobs executing in the thread. Used for interrupt handling. */
private final AtomicLongMap<Thread> jobs = AtomicLongMap.create();
- /** The {@link ExecutorService}. If !{@code concurrent}, this may be {@code null}. */
- @Nullable private final ExecutorService executorService;
+ private final ExecutorService executorService;
/**
* Flag used to record when the main thread (the thread which called {@link #awaitQuiescence})
@@ -145,77 +134,24 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
private static final Logger logger = Logger.getLogger(AbstractQueueVisitor.class.getName());
- /**
- * Create the {@link AbstractQueueVisitor}.
- *
- * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false} for
- * debugging.
- * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
- * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
- * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
- * @param keepAliveTime the keep-alive time for the {@link ExecutorService}, if applicable.
- * @param units the time units of keepAliveTime.
- * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
- * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code
- * null}, default thread naming will be used.
- */
- public AbstractQueueVisitor(
- boolean concurrent,
+ private static ExecutorService createExecutorService(
int parallelism,
long keepAliveTime,
TimeUnit units,
- boolean failFastOnException,
- String poolName) {
- this(
- concurrent,
- parallelism,
- keepAliveTime,
- units,
- failFastOnException,
- poolName,
- EXECUTOR_FACTORY,
- ErrorClassifier.DEFAULT);
- }
-
- /**
- * Create the {@link AbstractQueueVisitor}.
- *
- * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false} for
- * debugging.
- * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
- * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code corePoolSize} and
- * {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
- * @param keepAliveTime the keep-alive time for the {@link ExecutorService}, if applicable.
- * @param units the time units of keepAliveTime.
- * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
- * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code
- * null}, default thread naming will be used.
- * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs.
- */
- public AbstractQueueVisitor(
- boolean concurrent,
- int parallelism,
- long keepAliveTime,
- TimeUnit units,
- boolean failFastOnException,
String poolName,
- ErrorClassifier errorClassifier) {
- this(
- concurrent,
- parallelism,
- keepAliveTime,
- units,
- failFastOnException,
- poolName,
- EXECUTOR_FACTORY,
- errorClassifier);
+ Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
+ return Preconditions.checkNotNull(executorFactory)
+ .apply(
+ new ExecutorParams(
+ parallelism,
+ keepAliveTime,
+ units,
+ Preconditions.checkNotNull(poolName),
+ new BlockingStack<Runnable>()));
}
-
/**
* Create the {@link AbstractQueueVisitor}.
*
- * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false} for
- * debugging.
* @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
* parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code corePoolSize} and
* {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
@@ -224,12 +160,10 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
* @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code
* null}, default thread naming will be used.
- * @param executorFactory the factory for constructing the executor service if {@code concurrent}
- * is {@code true}.
+ * @param executorFactory the factory for constructing the executor service.
* @param errorClassifier an error classifier used to determine whether to log and/or stop jobs.
*/
public AbstractQueueVisitor(
- boolean concurrent,
int parallelism,
long keepAliveTime,
TimeUnit units,
@@ -237,71 +171,16 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
String poolName,
Function<ExecutorParams, ? extends ExecutorService> executorFactory,
ErrorClassifier errorClassifier) {
- Preconditions.checkNotNull(poolName);
- Preconditions.checkNotNull(executorFactory);
- Preconditions.checkNotNull(errorClassifier);
- this.concurrent = concurrent;
- this.failFastOnException = failFastOnException;
- this.ownExecutorService = true;
- this.executorService =
- concurrent
- ? executorFactory.apply(
- new ExecutorParams(
- parallelism, keepAliveTime, units, poolName, new BlockingStack<Runnable>()))
- : null;
- this.errorClassifier = errorClassifier;
- }
-
- /**
- * Create the {@link AbstractQueueVisitor}.
- *
- * @param executorService The {@link ExecutorService} to use.
- * @param shutdownOnCompletion If {@code true}, pass ownership of the {@link ExecutorService} to
- * this class. The service will be shut down after a
- * call to {@link #awaitQuiescence}. Callers must not shutdown the
- * {@link ExecutorService} while queue visitors use it.
- * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
- */
- public AbstractQueueVisitor(
- ExecutorService executorService, boolean shutdownOnCompletion, boolean failFastOnException) {
this(
- /*concurrent=*/ true,
- executorService,
- shutdownOnCompletion,
+ createExecutorService(parallelism, keepAliveTime, units, poolName, executorFactory),
+ true,
failFastOnException,
- ErrorClassifier.DEFAULT);
- }
-
- /**
- * Create the {@link AbstractQueueVisitor}.
- *
- * @param concurrent if {@code false}, run tasks inline instead of using the {@link
- * ExecutorService}.
- * @param executorService The {@link ExecutorService} to use.
- * @param shutdownOnCompletion If {@code true}, pass ownership of the {@link ExecutorService} to
- * this class. The service will be shut down after a
- * call to {@link #awaitQuiescence}. Callers must not shut down the
- * {@link ExecutorService} while queue visitors use it.
- * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
- */
- public AbstractQueueVisitor(
- boolean concurrent,
- ExecutorService executorService,
- boolean shutdownOnCompletion,
- boolean failFastOnException) {
- Preconditions.checkArgument(executorService != null || !concurrent);
- this.concurrent = concurrent;
- this.failFastOnException = failFastOnException;
- this.ownExecutorService = shutdownOnCompletion;
- this.executorService = executorService;
- this.errorClassifier = ErrorClassifier.DEFAULT;
+ errorClassifier);
}
/**
* Create the AbstractQueueVisitor.
*
- * @param concurrent if {@code false}, run tasks inline instead of using the {@link
- * ExecutorService}.
* @param executorService The {@link ExecutorService} to use.
* @param shutdownOnCompletion If {@code true}, pass ownership of the {@link ExecutorService} to
* this class. The service will be shut down after a call to {@link #awaitQuiescence}. Callers
@@ -309,56 +188,23 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
* @param errorClassifier an error classifier used to determine whether to log and/or stop jobs.
*/
- public AbstractQueueVisitor(
- boolean concurrent,
+ protected AbstractQueueVisitor(
ExecutorService executorService,
boolean shutdownOnCompletion,
boolean failFastOnException,
ErrorClassifier errorClassifier) {
- Preconditions.checkArgument(executorService != null || !concurrent);
- this.concurrent = concurrent;
this.failFastOnException = failFastOnException;
this.ownExecutorService = shutdownOnCompletion;
- this.executorService = executorService;
- this.errorClassifier = errorClassifier;
+ this.executorService = Preconditions.checkNotNull(executorService);
+ this.errorClassifier = Preconditions.checkNotNull(errorClassifier);
}
- /**
- * Create the {@code AbstractQueueVisitor} with concurrency enabled.
- *
- * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
- * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
- * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
- * @param keepAlive the keep-alive time for the {@link ExecutorService}, if applicable.
- * @param units the time units of keepAliveTime.
- * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code
- * null}, default thread naming will be used.
- */
- public AbstractQueueVisitor(int parallelism, long keepAlive, TimeUnit units, String poolName) {
- this(
- true,
- parallelism,
- keepAlive,
- units,
- false,
- poolName,
- EXECUTOR_FACTORY,
- ErrorClassifier.DEFAULT);
- }
-
-
@Override
public final void awaitQuiescence(boolean interruptWorkers) throws InterruptedException {
- if (concurrent) {
- awaitTermination(interruptWorkers);
- } else {
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException();
- }
- }
+ awaitTermination(interruptWorkers);
}
- /** Schedules a call. Called in a worker thread if concurrent. */
+ /** Schedules a call. Called in a worker thread. */
@Override
public final void execute(Runnable runnable) {
if (runConcurrently()) {
@@ -391,7 +237,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* asynchronously versus in-thread.
*/
protected boolean runConcurrently() {
- return concurrent;
+ return true;
}
/**
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
index ca2b38da44..295e758628 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
@@ -28,7 +28,6 @@ public class ForkJoinQuiescingExecutor extends AbstractQueueVisitor {
private ForkJoinQuiescingExecutor(
ForkJoinPool forkJoinPool, ErrorClassifier errorClassifier, boolean shutdownOnCompletion) {
super(
- /*concurrent=*/ true,
forkJoinPool,
shutdownOnCompletion,
/*failFastOnException=*/ true,
diff --git a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
index fc51c9a9d6..d3b2c140f4 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
+import com.google.devtools.build.lib.concurrent.ErrorClassifier;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.packages.AggregatingAttributeMapper;
@@ -240,8 +241,6 @@ final class LabelVisitor {
private final Iterable<TargetEdgeObserver> observers;
private final TargetEdgeErrorObserver errorObserver;
private final AtomicBoolean stopNewActions = new AtomicBoolean(false);
- private static final boolean CONCURRENT = true;
-
public Visitor(
ExtendedEventHandler eventHandler,
@@ -252,7 +251,14 @@ final class LabelVisitor {
// Observing the loading phase of a typical large package (with all subpackages) shows
// maximum thread-level concurrency of ~20. Limiting the total number of threads to 200 is
// therefore conservative and should help us avoid hitting native limits.
- super(CONCURRENT, parallelThreads, 1L, TimeUnit.SECONDS, !keepGoing, THREAD_NAME);
+ super(
+ parallelThreads,
+ 1L,
+ TimeUnit.SECONDS,
+ !keepGoing,
+ THREAD_NAME,
+ AbstractQueueVisitor.EXECUTOR_FACTORY,
+ ErrorClassifier.DEFAULT);
this.eventHandler = eventHandler;
this.maxDepth = maxDepth;
this.errorObserver = new TargetEdgeErrorObserver();
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index 0052892130..07249b50fb 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -549,7 +549,6 @@ class ParallelSkyQueryUtils {
private class BFSVisitingTaskExecutor extends AbstractQueueVisitor {
private BFSVisitingTaskExecutor(ExecutorService executor, ErrorClassifier errorClassifier) {
super(
- /*concurrent=*/ true,
/*executorService=*/ executor,
// Leave the thread pool active for other current and future callers.
/*shutdownOnCompletion=*/ false,
diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
index d7bb2d2cab..30314f5fa7 100644
--- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
+++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java
@@ -100,7 +100,6 @@ public abstract class InvalidatingNodeVisitor<TGraph extends QueryableGraph> {
Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
this.executor =
new AbstractQueueVisitor(
- /*concurrent=*/ true,
/*parallelism=*/ DEFAULT_THREAD_COUNT,
/*keepAliveTime=*/ 1,
/*units=*/ TimeUnit.SECONDS,
diff --git a/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
index 722410e77b..d528a830cd 100644
--- a/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
+++ b/src/main/java/com/google/devtools/build/skyframe/NodeEntryVisitor.java
@@ -78,12 +78,12 @@ class NodeEntryVisitor {
Function<SkyKey, Runnable> runnableMaker) {
quiescingExecutor =
new AbstractQueueVisitor(
- /*concurrent*/ true,
threadCount,
/*keepAliveTime=*/ 1,
TimeUnit.SECONDS,
/*failFastOnException*/ true,
"skyframe-evaluator",
+ AbstractQueueVisitor.EXECUTOR_FACTORY,
NODE_ENTRY_VISITOR_ERROR_CLASSIFIER);
this.progressReceiver = progressReceiver;
this.runnableMaker = runnableMaker;