aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java198
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java1
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java12
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java1
4 files changed, 31 insertions, 181 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index 98e9efba08..f82ead9512 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
-import javax.annotation.Nullable;
/** A {@link QuiescingExecutor} implementation that wraps an {@link ExecutorService}. */
public class AbstractQueueVisitor implements QuiescingExecutor {
@@ -86,22 +85,13 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
private volatile Throwable catastrophe;
/**
- * Enables concurrency. For debugging or testing, set this to {@code false} to avoid thread
- * creation and concurrency. Any deviation in observed behaviour is a bug.
- */
- private final boolean concurrent;
-
- /**
* An object used in the manner of a {@link java.util.concurrent.locks.Condition} object, for the
* condition {@code remainingTasks.get() == 0 || jobsMustBeStopped}.
* TODO(bazel-team): Replace with an actual {@link java.util.concurrent.locks.Condition} object.
*/
private final Object zeroRemainingTasks = new Object();
- /**
- * If {@link #concurrent} is {@code true}, then this is a counter of the number of {@link
- * Runnable}s {@link #execute}-d that have not finished evaluation.
- */
+ /** The number of {@link Runnable}s {@link #execute}-d that have not finished evaluation. */
private final AtomicLong remainingTasks = new AtomicLong(0);
/**
@@ -116,8 +106,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
/** Map from thread to number of jobs executing in the thread. Used for interrupt handling. */
private final AtomicLongMap<Thread> jobs = AtomicLongMap.create();
- /** The {@link ExecutorService}. If !{@code concurrent}, this may be {@code null}. */
- @Nullable private final ExecutorService executorService;
+ private final ExecutorService executorService;
/**
* Flag used to record when the main thread (the thread which called {@link #awaitQuiescence})
@@ -145,77 +134,24 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
private static final Logger logger = Logger.getLogger(AbstractQueueVisitor.class.getName());
- /**
- * Create the {@link AbstractQueueVisitor}.
- *
- * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false} for
- * debugging.
- * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
- * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
- * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
- * @param keepAliveTime the keep-alive time for the {@link ExecutorService}, if applicable.
- * @param units the time units of keepAliveTime.
- * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
- * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code
- * null}, default thread naming will be used.
- */
- public AbstractQueueVisitor(
- boolean concurrent,
+ private static ExecutorService createExecutorService(
int parallelism,
long keepAliveTime,
TimeUnit units,
- boolean failFastOnException,
- String poolName) {
- this(
- concurrent,
- parallelism,
- keepAliveTime,
- units,
- failFastOnException,
- poolName,
- EXECUTOR_FACTORY,
- ErrorClassifier.DEFAULT);
- }
-
- /**
- * Create the {@link AbstractQueueVisitor}.
- *
- * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false} for
- * debugging.
- * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
- * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code corePoolSize} and
- * {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
- * @param keepAliveTime the keep-alive time for the {@link ExecutorService}, if applicable.
- * @param units the time units of keepAliveTime.
- * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
- * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code
- * null}, default thread naming will be used.
- * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs.
- */
- public AbstractQueueVisitor(
- boolean concurrent,
- int parallelism,
- long keepAliveTime,
- TimeUnit units,
- boolean failFastOnException,
String poolName,
- ErrorClassifier errorClassifier) {
- this(
- concurrent,
- parallelism,
- keepAliveTime,
- units,
- failFastOnException,
- poolName,
- EXECUTOR_FACTORY,
- errorClassifier);
+ Function<ExecutorParams, ? extends ExecutorService> executorFactory) {
+ return Preconditions.checkNotNull(executorFactory)
+ .apply(
+ new ExecutorParams(
+ parallelism,
+ keepAliveTime,
+ units,
+ Preconditions.checkNotNull(poolName),
+ new BlockingStack<Runnable>()));
}
-
/**
* Create the {@link AbstractQueueVisitor}.
*
- * @param concurrent {@code true} if concurrency should be enabled. Only set to {@code false} for
- * debugging.
* @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
* parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code corePoolSize} and
* {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
@@ -224,12 +160,10 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
* @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code
* null}, default thread naming will be used.
- * @param executorFactory the factory for constructing the executor service if {@code concurrent}
- * is {@code true}.
+ * @param executorFactory the factory for constructing the executor service.
* @param errorClassifier an error classifier used to determine whether to log and/or stop jobs.
*/
public AbstractQueueVisitor(
- boolean concurrent,
int parallelism,
long keepAliveTime,
TimeUnit units,
@@ -237,71 +171,16 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
String poolName,
Function<ExecutorParams, ? extends ExecutorService> executorFactory,
ErrorClassifier errorClassifier) {
- Preconditions.checkNotNull(poolName);
- Preconditions.checkNotNull(executorFactory);
- Preconditions.checkNotNull(errorClassifier);
- this.concurrent = concurrent;
- this.failFastOnException = failFastOnException;
- this.ownExecutorService = true;
- this.executorService =
- concurrent
- ? executorFactory.apply(
- new ExecutorParams(
- parallelism, keepAliveTime, units, poolName, new BlockingStack<Runnable>()))
- : null;
- this.errorClassifier = errorClassifier;
- }
-
- /**
- * Create the {@link AbstractQueueVisitor}.
- *
- * @param executorService The {@link ExecutorService} to use.
- * @param shutdownOnCompletion If {@code true}, pass ownership of the {@link ExecutorService} to
- * this class. The service will be shut down after a
- * call to {@link #awaitQuiescence}. Callers must not shutdown the
- * {@link ExecutorService} while queue visitors use it.
- * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
- */
- public AbstractQueueVisitor(
- ExecutorService executorService, boolean shutdownOnCompletion, boolean failFastOnException) {
this(
- /*concurrent=*/ true,
- executorService,
- shutdownOnCompletion,
+ createExecutorService(parallelism, keepAliveTime, units, poolName, executorFactory),
+ true,
failFastOnException,
- ErrorClassifier.DEFAULT);
- }
-
- /**
- * Create the {@link AbstractQueueVisitor}.
- *
- * @param concurrent if {@code false}, run tasks inline instead of using the {@link
- * ExecutorService}.
- * @param executorService The {@link ExecutorService} to use.
- * @param shutdownOnCompletion If {@code true}, pass ownership of the {@link ExecutorService} to
- * this class. The service will be shut down after a
- * call to {@link #awaitQuiescence}. Callers must not shut down the
- * {@link ExecutorService} while queue visitors use it.
- * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
- */
- public AbstractQueueVisitor(
- boolean concurrent,
- ExecutorService executorService,
- boolean shutdownOnCompletion,
- boolean failFastOnException) {
- Preconditions.checkArgument(executorService != null || !concurrent);
- this.concurrent = concurrent;
- this.failFastOnException = failFastOnException;
- this.ownExecutorService = shutdownOnCompletion;
- this.executorService = executorService;
- this.errorClassifier = ErrorClassifier.DEFAULT;
+ errorClassifier);
}
/**
* Create the AbstractQueueVisitor.
*
- * @param concurrent if {@code false}, run tasks inline instead of using the {@link
- * ExecutorService}.
* @param executorService The {@link ExecutorService} to use.
* @param shutdownOnCompletion If {@code true}, pass ownership of the {@link ExecutorService} to
* this class. The service will be shut down after a call to {@link #awaitQuiescence}. Callers
@@ -309,56 +188,23 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* @param failFastOnException if {@code true}, don't run new actions after an uncaught exception.
* @param errorClassifier an error classifier used to determine whether to log and/or stop jobs.
*/
- public AbstractQueueVisitor(
- boolean concurrent,
+ protected AbstractQueueVisitor(
ExecutorService executorService,
boolean shutdownOnCompletion,
boolean failFastOnException,
ErrorClassifier errorClassifier) {
- Preconditions.checkArgument(executorService != null || !concurrent);
- this.concurrent = concurrent;
this.failFastOnException = failFastOnException;
this.ownExecutorService = shutdownOnCompletion;
- this.executorService = executorService;
- this.errorClassifier = errorClassifier;
+ this.executorService = Preconditions.checkNotNull(executorService);
+ this.errorClassifier = Preconditions.checkNotNull(errorClassifier);
}
- /**
- * Create the {@code AbstractQueueVisitor} with concurrency enabled.
- *
- * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code
- * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code
- * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}.
- * @param keepAlive the keep-alive time for the {@link ExecutorService}, if applicable.
- * @param units the time units of keepAliveTime.
- * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code
- * null}, default thread naming will be used.
- */
- public AbstractQueueVisitor(int parallelism, long keepAlive, TimeUnit units, String poolName) {
- this(
- true,
- parallelism,
- keepAlive,
- units,
- false,
- poolName,
- EXECUTOR_FACTORY,
- ErrorClassifier.DEFAULT);
- }
-
-
@Override
public final void awaitQuiescence(boolean interruptWorkers) throws InterruptedException {
- if (concurrent) {
- awaitTermination(interruptWorkers);
- } else {
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException();
- }
- }
+ awaitTermination(interruptWorkers);
}
- /** Schedules a call. Called in a worker thread if concurrent. */
+ /** Schedules a call. Called in a worker thread. */
@Override
public final void execute(Runnable runnable) {
if (runConcurrently()) {
@@ -391,7 +237,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* asynchronously versus in-thread.
*/
protected boolean runConcurrently() {
- return concurrent;
+ return true;
}
/**
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
index ca2b38da44..295e758628 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
@@ -28,7 +28,6 @@ public class ForkJoinQuiescingExecutor extends AbstractQueueVisitor {
private ForkJoinQuiescingExecutor(
ForkJoinPool forkJoinPool, ErrorClassifier errorClassifier, boolean shutdownOnCompletion) {
super(
- /*concurrent=*/ true,
forkJoinPool,
shutdownOnCompletion,
/*failFastOnException=*/ true,
diff --git a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
index fc51c9a9d6..d3b2c140f4 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Multimaps;
import com.google.common.collect.SetMultimap;
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
+import com.google.devtools.build.lib.concurrent.ErrorClassifier;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.packages.AggregatingAttributeMapper;
@@ -240,8 +241,6 @@ final class LabelVisitor {
private final Iterable<TargetEdgeObserver> observers;
private final TargetEdgeErrorObserver errorObserver;
private final AtomicBoolean stopNewActions = new AtomicBoolean(false);
- private static final boolean CONCURRENT = true;
-
public Visitor(
ExtendedEventHandler eventHandler,
@@ -252,7 +251,14 @@ final class LabelVisitor {
// Observing the loading phase of a typical large package (with all subpackages) shows
// maximum thread-level concurrency of ~20. Limiting the total number of threads to 200 is
// therefore conservative and should help us avoid hitting native limits.
- super(CONCURRENT, parallelThreads, 1L, TimeUnit.SECONDS, !keepGoing, THREAD_NAME);
+ super(
+ parallelThreads,
+ 1L,
+ TimeUnit.SECONDS,
+ !keepGoing,
+ THREAD_NAME,
+ AbstractQueueVisitor.EXECUTOR_FACTORY,
+ ErrorClassifier.DEFAULT);
this.eventHandler = eventHandler;
this.maxDepth = maxDepth;
this.errorObserver = new TargetEdgeErrorObserver();
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index 0052892130..07249b50fb 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -549,7 +549,6 @@ class ParallelSkyQueryUtils {
private class BFSVisitingTaskExecutor extends AbstractQueueVisitor {
private BFSVisitingTaskExecutor(ExecutorService executor, ErrorClassifier errorClassifier) {
super(
- /*concurrent=*/ true,
/*executorService=*/ executor,
// Leave the thread pool active for other current and future callers.
/*shutdownOnCompletion=*/ false,