aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Googler <noreply@google.com>2017-03-01 19:37:55 +0000
committerGravatar Yue Gan <yueg@google.com>2017-03-02 13:31:56 +0000
commit33093ca444d979dfcd81a56655d4e5dbbd1b35e9 (patch)
tree1eb0ba1f4a70106df3a66a3898054457fe10da9d
parent83ca6a6befc5075de9a1d48532e9f08198511145 (diff)
Do not use additional scheduling threads during parallel evaluation to prevent thread starvation
This change gets rid of the additional thread needed for task scheduling during BFS visitation, which eliminates the possibility of thread starvation while a single thread pool is used for multiple concurrent evaluations. -- PiperOrigin-RevId: 148911346 MOS_MIGRATED_REVID=148911346
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java42
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java3
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java134
3 files changed, 97 insertions, 82 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index 45d1c56c15..9bbb2af1eb 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -108,9 +108,10 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* Flag used to record when all threads were killed by failed action execution. Only ever
* transitions from {@code false} to {@code true}.
*
- * <p>May only be accessed in a block that is synchronized on {@link #zeroRemainingTasks}.
+ * <p>Except for {@link #mustJobsBeStopped}, may only be accessed in a block that is synchronized
+ * on {@link #zeroRemainingTasks}.
*/
- private boolean jobsMustBeStopped = false;
+ private volatile boolean jobsMustBeStopped = false;
/** Map from thread to number of jobs executing in the thread. Used for interrupt handling. */
private final AtomicLongMap<Thread> jobs = AtomicLongMap.create();
@@ -142,7 +143,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
private final ErrorClassifier errorClassifier;
- private static final Logger LOG = Logger.getLogger(AbstractQueueVisitor.class.getName());
+ private static final Logger logger = Logger.getLogger(AbstractQueueVisitor.class.getName());
/**
* Create the {@link AbstractQueueVisitor}.
@@ -385,11 +386,6 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
}
- @Override
- public long getRemainingTasksCount() {
- return remainingTasks.get();
- }
-
/**
* Subclasses may override this to make dynamic decisions about whether to run tasks
* asynchronously versus in-thread.
@@ -415,10 +411,10 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
ErrorClassification errorClassification = errorClassifier.classify(e);
switch (errorClassification) {
case AS_CRITICAL_AS_POSSIBLE:
- case CRITICAL_AND_LOG:
- critical = true;
- LOG.log(Level.WARNING, "Found critical error in queue visitor", e);
- break;
+ case CRITICAL_AND_LOG:
+ critical = true;
+ logger.log(Level.WARNING, "Found critical error in queue visitor", e);
+ break;
case CRITICAL:
critical = true;
break;
@@ -524,7 +520,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
/** Set an internal flag to show that an interrupt was detected. */
- private void setInterrupted() {
+ protected final void setInterrupted() {
threadInterrupted = true;
}
@@ -568,17 +564,27 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* Get number of jobs remaining. Note that this can increase in value if running tasks submit
* further jobs.
*/
- @VisibleForTesting
protected final long getTaskCount() {
return remainingTasks.get();
}
/**
- * Waits for the task queue to drain, then shuts down the {@link ExecutorService} and
- * waits for it to terminate. Throws (the same) unchecked exception if any
- * worker thread failed unexpectedly.
+ * Whether all running and pending jobs will be stopped or cancelled. Also newly submitted tasks
+ * will be rejected if this is true.
+ *
+ * <p>This function returns the CURRENT state of whether jobs should be stopped. If the value is
+ * false right now, it may be changed to true by another thread later.
+ */
+ protected final boolean mustJobsBeStopped() {
+ return jobsMustBeStopped;
+ }
+
+ /**
+ * Waits for the task queue to drain. Then if {@code ownExecutorService} is true, shuts down the
+ * {@link ExecutorService} and waits for it to terminate. Throws (the same) unchecked exception if
+ * any worker thread failed unexpectedly.
*/
- private void awaitTermination(boolean interruptWorkers) throws InterruptedException {
+ protected final void awaitTermination(boolean interruptWorkers) throws InterruptedException {
Throwables.propagateIfPossible(catastrophe);
try {
synchronized (zeroRemainingTasks) {
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
index 78bc93dc10..3424bb4adf 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
@@ -52,9 +52,6 @@ public interface QuiescingExecutor extends Executor {
*/
void awaitQuiescence(boolean interruptWorkers) throws InterruptedException;
- /** Return the number of tasks which are not completed (running or waiting to be executed). */
- long getRemainingTasksCount();
-
/** Get latch that is released if a task throws an exception. Used only in tests. */
@VisibleForTesting
CountDownLatch getExceptionLatchForTestingOnly();
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index 30a27f8e63..95486c6ef9 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -373,7 +373,7 @@ class ParallelSkyQueryUtils {
private final ThreadSafeUniquifier<T> uniquifier;
private final Callback<Target> callback;
- private final QuiescingExecutor executor;
+ private final BFSVisitingTaskExecutor executor;
/** A queue to store pending visits. */
private final LinkedBlockingQueue<T> processingQueue = new LinkedBlockingQueue<>();
@@ -439,13 +439,8 @@ class ParallelSkyQueryUtils {
this.uniquifier = uniquifier;
this.callback = callback;
this.executor =
- new AbstractQueueVisitor(
- /*concurrent=*/ true,
- /*executorService=*/ FIXED_THREAD_POOL_EXECUTOR,
- // Leave the thread pool active for other current and future callers.
- /*shutdownOnCompletion=*/ false,
- /*failFastOnException=*/ true,
- /*errorClassifier=*/ SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER);
+ new BFSVisitingTaskExecutor(
+ FIXED_THREAD_POOL_EXECUTOR, SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER);
}
/** Factory for {@link AbstractSkyKeyBFSVisitor} instances. */
@@ -466,16 +461,7 @@ class ParallelSkyQueryUtils {
void visitAndWaitForCompletion(Iterable<SkyKey> keys)
throws QueryException, InterruptedException {
processingQueue.addAll(ImmutableList.copyOf(preprocessInitialVisit(keys)));
- // We add the scheduler to the pool, allowing it (as well as any submitted tasks later)
- // to be failed fast if any QueryException or InterruptedException is received.
- executor.execute(new Scheduler());
- try {
- executor.awaitQuiescence(true);
- } catch (RuntimeQueryException e) {
- throw (QueryException) e.getCause();
- } catch (RuntimeInterruptedException e) {
- throw (InterruptedException) e.getCause();
- }
+ executor.bfsVisitAndWaitForCompletion();
}
/**
@@ -502,49 +488,6 @@ class ParallelSkyQueryUtils {
return builder.build();
}
- private class Scheduler implements Runnable {
- @Override
- public void run() {
- // The scheduler keeps running until both the following two conditions are met.
- //
- // 1. There is no pending visit in the queue.
- // 2. There is no pending task (other than itself) in the pool.
- if (processingQueue.isEmpty() && executor.getRemainingTasksCount() <= 1) {
- return;
- }
-
- // To achieve maximum efficiency, queue is drained in either of the following 2 conditions:
- //
- // 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU.
- // 2. The process queue size is large.
- if (executor.getRemainingTasksCount() < MIN_PENDING_TASKS
- || processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) {
- drainProcessingQueue();
- }
-
- try {
- // Wait at most {@code SCHEDULING_INTERVAL_MILLISECONDS} milliseconds.
- Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS);
- } catch (InterruptedException e) {
- throw new RuntimeInterruptedException(e);
- }
-
- executor.execute(new Scheduler());
- }
-
- private void drainProcessingQueue() {
- Collection<T> pendingKeysToVisit = new ArrayList<>(processingQueue.size());
- processingQueue.drainTo(pendingKeysToVisit);
- if (pendingKeysToVisit.isEmpty()) {
- return;
- }
-
- for (Task task : getVisitTasks(pendingKeysToVisit)) {
- executor.execute(task);
- }
- }
- }
-
abstract static class Task implements Runnable {
@Override
@@ -598,6 +541,75 @@ class ParallelSkyQueryUtils {
processResultantTargets(keysToUseForResult, callback);
}
}
+
+ /**
+ * A custom implementation of {@link QuiescingExecutor} which uses a centralized queue and
+ * scheduler for parallel BFS visitations.
+ */
+ private class BFSVisitingTaskExecutor extends AbstractQueueVisitor {
+ private BFSVisitingTaskExecutor(ExecutorService executor, ErrorClassifier errorClassifier) {
+ super(
+ /*concurrent=*/ true,
+ /*executorService=*/ executor,
+ // Leave the thread pool active for other current and future callers.
+ /*shutdownOnCompletion=*/ false,
+ /*failFastOnException=*/ true,
+ /*errorClassifier=*/ errorClassifier);
+ }
+
+ private void bfsVisitAndWaitForCompletion() throws QueryException, InterruptedException {
+ // The scheduler keeps running until either of the following two conditions are met.
+ //
+ // 1. Errors (QueryException or InterruptedException) occurred and visitations should fail
+ // fast.
+ // 2. There is no pending visit in the queue and no pending task running.
+ while (!mustJobsBeStopped() && (!processingQueue.isEmpty() || getTaskCount() > 0)) {
+ // To achieve maximum efficiency, queue is drained in either of the following two
+ // conditions:
+ //
+ // 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU.
+ // 2. The process queue size is large.
+ if (getTaskCount() < MIN_PENDING_TASKS
+ || processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) {
+
+ Collection<T> pendingKeysToVisit = new ArrayList<>(processingQueue.size());
+ processingQueue.drainTo(pendingKeysToVisit);
+ for (Task task : getVisitTasks(pendingKeysToVisit)) {
+ execute(task);
+ }
+ }
+
+ try {
+ Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS);
+ } catch (InterruptedException e) {
+ // If the main thread waiting for completion of the visitation is interrupted, we should
+ // gracefully terminate all running and pending tasks before exit. If QueryException
+ // occured in any of the worker thread, awaitTerminationAndPropagateErrorsIfAny
+ // propagates the QueryException instead of InterruptedException.
+ setInterrupted();
+ awaitTerminationAndPropagateErrorsIfAny();
+ throw e;
+ }
+ }
+
+ // We reach here either because the visitation is complete, or because an error prevents us
+ // from proceeding with the visitation. awaitTerminationAndPropagateErrorsIfAny will either
+ // gracefully exit if the visitation is complete, or propagate the exception if error
+ // occurred.
+ awaitTerminationAndPropagateErrorsIfAny();
+ }
+
+ private void awaitTerminationAndPropagateErrorsIfAny()
+ throws QueryException, InterruptedException {
+ try {
+ awaitTermination(/*interruptWorkers=*/ true);
+ } catch (RuntimeQueryException e) {
+ throw (QueryException) e.getCause();
+ } catch (RuntimeInterruptedException e) {
+ throw (InterruptedException) e.getCause();
+ }
+ }
+ }
}
private static class RuntimeQueryException extends RuntimeException {