aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
diff options
context:
space:
mode:
authorGravatar Googler <noreply@google.com>2017-03-01 19:37:55 +0000
committerGravatar Yue Gan <yueg@google.com>2017-03-02 13:31:56 +0000
commit33093ca444d979dfcd81a56655d4e5dbbd1b35e9 (patch)
tree1eb0ba1f4a70106df3a66a3898054457fe10da9d /src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
parent83ca6a6befc5075de9a1d48532e9f08198511145 (diff)
Do not use additional scheduling threads during parallel evaluation to prevent thread starvation
This change gets rid of the additional thread needed for task scheduling during BFS visitation, which eliminates the possibility of thread starvation while a single thread pool is used for multiple concurrent evaluations. -- PiperOrigin-RevId: 148911346 MOS_MIGRATED_REVID=148911346
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java42
1 files changed, 24 insertions, 18 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index 45d1c56c15..9bbb2af1eb 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -108,9 +108,10 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* Flag used to record when all threads were killed by failed action execution. Only ever
* transitions from {@code false} to {@code true}.
*
- * <p>May only be accessed in a block that is synchronized on {@link #zeroRemainingTasks}.
+ * <p>Except for {@link #mustJobsBeStopped}, may only be accessed in a block that is synchronized
+ * on {@link #zeroRemainingTasks}.
*/
- private boolean jobsMustBeStopped = false;
+ private volatile boolean jobsMustBeStopped = false;
/** Map from thread to number of jobs executing in the thread. Used for interrupt handling. */
private final AtomicLongMap<Thread> jobs = AtomicLongMap.create();
@@ -142,7 +143,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
private final ErrorClassifier errorClassifier;
- private static final Logger LOG = Logger.getLogger(AbstractQueueVisitor.class.getName());
+ private static final Logger logger = Logger.getLogger(AbstractQueueVisitor.class.getName());
/**
* Create the {@link AbstractQueueVisitor}.
@@ -385,11 +386,6 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
}
- @Override
- public long getRemainingTasksCount() {
- return remainingTasks.get();
- }
-
/**
* Subclasses may override this to make dynamic decisions about whether to run tasks
* asynchronously versus in-thread.
@@ -415,10 +411,10 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
ErrorClassification errorClassification = errorClassifier.classify(e);
switch (errorClassification) {
case AS_CRITICAL_AS_POSSIBLE:
- case CRITICAL_AND_LOG:
- critical = true;
- LOG.log(Level.WARNING, "Found critical error in queue visitor", e);
- break;
+ case CRITICAL_AND_LOG:
+ critical = true;
+ logger.log(Level.WARNING, "Found critical error in queue visitor", e);
+ break;
case CRITICAL:
critical = true;
break;
@@ -524,7 +520,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
/** Set an internal flag to show that an interrupt was detected. */
- private void setInterrupted() {
+ protected final void setInterrupted() {
threadInterrupted = true;
}
@@ -568,17 +564,27 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* Get number of jobs remaining. Note that this can increase in value if running tasks submit
* further jobs.
*/
- @VisibleForTesting
protected final long getTaskCount() {
return remainingTasks.get();
}
/**
- * Waits for the task queue to drain, then shuts down the {@link ExecutorService} and
- * waits for it to terminate. Throws (the same) unchecked exception if any
- * worker thread failed unexpectedly.
+ * Whether all running and pending jobs will be stopped or cancelled. Also newly submitted tasks
+ * will be rejected if this is true.
+ *
+ * <p>This function returns the CURRENT state of whether jobs should be stopped. If the value is
+ * false right now, it may be changed to true by another thread later.
+ */
+ protected final boolean mustJobsBeStopped() {
+ return jobsMustBeStopped;
+ }
+
+ /**
+ * Waits for the task queue to drain. Then if {@code ownExecutorService} is true, shuts down the
+ * {@link ExecutorService} and waits for it to terminate. Throws (the same) unchecked exception if
+ * any worker thread failed unexpectedly.
*/
- private void awaitTermination(boolean interruptWorkers) throws InterruptedException {
+ protected final void awaitTermination(boolean interruptWorkers) throws InterruptedException {
Throwables.propagateIfPossible(catastrophe);
try {
synchronized (zeroRemainingTasks) {