aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java42
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java3
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java134
3 files changed, 97 insertions, 82 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index 45d1c56c15..9bbb2af1eb 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -108,9 +108,10 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* Flag used to record when all threads were killed by failed action execution. Only ever
* transitions from {@code false} to {@code true}.
*
- * <p>May only be accessed in a block that is synchronized on {@link #zeroRemainingTasks}.
+ * <p>Except for {@link #mustJobsBeStopped}, may only be accessed in a block that is synchronized
+ * on {@link #zeroRemainingTasks}.
*/
- private boolean jobsMustBeStopped = false;
+ private volatile boolean jobsMustBeStopped = false;
/** Map from thread to number of jobs executing in the thread. Used for interrupt handling. */
private final AtomicLongMap<Thread> jobs = AtomicLongMap.create();
@@ -142,7 +143,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
private final ErrorClassifier errorClassifier;
- private static final Logger LOG = Logger.getLogger(AbstractQueueVisitor.class.getName());
+ private static final Logger logger = Logger.getLogger(AbstractQueueVisitor.class.getName());
/**
* Create the {@link AbstractQueueVisitor}.
@@ -385,11 +386,6 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
}
- @Override
- public long getRemainingTasksCount() {
- return remainingTasks.get();
- }
-
/**
* Subclasses may override this to make dynamic decisions about whether to run tasks
* asynchronously versus in-thread.
@@ -415,10 +411,10 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
ErrorClassification errorClassification = errorClassifier.classify(e);
switch (errorClassification) {
case AS_CRITICAL_AS_POSSIBLE:
- case CRITICAL_AND_LOG:
- critical = true;
- LOG.log(Level.WARNING, "Found critical error in queue visitor", e);
- break;
+ case CRITICAL_AND_LOG:
+ critical = true;
+ logger.log(Level.WARNING, "Found critical error in queue visitor", e);
+ break;
case CRITICAL:
critical = true;
break;
@@ -524,7 +520,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
/** Set an internal flag to show that an interrupt was detected. */
- private void setInterrupted() {
+ protected final void setInterrupted() {
threadInterrupted = true;
}
@@ -568,17 +564,27 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* Get number of jobs remaining. Note that this can increase in value if running tasks submit
* further jobs.
*/
- @VisibleForTesting
protected final long getTaskCount() {
return remainingTasks.get();
}
/**
- * Waits for the task queue to drain, then shuts down the {@link ExecutorService} and
- * waits for it to terminate. Throws (the same) unchecked exception if any
- * worker thread failed unexpectedly.
+ * Whether all running and pending jobs will be stopped or cancelled. Also newly submitted tasks
+ * will be rejected if this is true.
+ *
+ * <p>This function returns the CURRENT state of whether jobs should be stopped. If the value is
+ * false right now, it may be changed to true by another thread later.
+ */
+ protected final boolean mustJobsBeStopped() {
+ return jobsMustBeStopped;
+ }
+
+ /**
+ * Waits for the task queue to drain. Then if {@code ownExecutorService} is true, shuts down the
+ * {@link ExecutorService} and waits for it to terminate. Throws (the same) unchecked exception if
+ * any worker thread failed unexpectedly.
*/
- private void awaitTermination(boolean interruptWorkers) throws InterruptedException {
+ protected final void awaitTermination(boolean interruptWorkers) throws InterruptedException {
Throwables.propagateIfPossible(catastrophe);
try {
synchronized (zeroRemainingTasks) {
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
index 78bc93dc10..3424bb4adf 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
@@ -52,9 +52,6 @@ public interface QuiescingExecutor extends Executor {
*/
void awaitQuiescence(boolean interruptWorkers) throws InterruptedException;
- /** Return the number of tasks which are not completed (running or waiting to be executed). */
- long getRemainingTasksCount();
-
/** Get latch that is released if a task throws an exception. Used only in tests. */
@VisibleForTesting
CountDownLatch getExceptionLatchForTestingOnly();
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index 30a27f8e63..95486c6ef9 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -373,7 +373,7 @@ class ParallelSkyQueryUtils {
private final ThreadSafeUniquifier<T> uniquifier;
private final Callback<Target> callback;
- private final QuiescingExecutor executor;
+ private final BFSVisitingTaskExecutor executor;
/** A queue to store pending visits. */
private final LinkedBlockingQueue<T> processingQueue = new LinkedBlockingQueue<>();
@@ -439,13 +439,8 @@ class ParallelSkyQueryUtils {
this.uniquifier = uniquifier;
this.callback = callback;
this.executor =
- new AbstractQueueVisitor(
- /*concurrent=*/ true,
- /*executorService=*/ FIXED_THREAD_POOL_EXECUTOR,
- // Leave the thread pool active for other current and future callers.
- /*shutdownOnCompletion=*/ false,
- /*failFastOnException=*/ true,
- /*errorClassifier=*/ SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER);
+ new BFSVisitingTaskExecutor(
+ FIXED_THREAD_POOL_EXECUTOR, SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER);
}
/** Factory for {@link AbstractSkyKeyBFSVisitor} instances. */
@@ -466,16 +461,7 @@ class ParallelSkyQueryUtils {
void visitAndWaitForCompletion(Iterable<SkyKey> keys)
throws QueryException, InterruptedException {
processingQueue.addAll(ImmutableList.copyOf(preprocessInitialVisit(keys)));
- // We add the scheduler to the pool, allowing it (as well as any submitted tasks later)
- // to be failed fast if any QueryException or InterruptedException is received.
- executor.execute(new Scheduler());
- try {
- executor.awaitQuiescence(true);
- } catch (RuntimeQueryException e) {
- throw (QueryException) e.getCause();
- } catch (RuntimeInterruptedException e) {
- throw (InterruptedException) e.getCause();
- }
+ executor.bfsVisitAndWaitForCompletion();
}
/**
@@ -502,49 +488,6 @@ class ParallelSkyQueryUtils {
return builder.build();
}
- private class Scheduler implements Runnable {
- @Override
- public void run() {
- // The scheduler keeps running until both the following two conditions are met.
- //
- // 1. There is no pending visit in the queue.
- // 2. There is no pending task (other than itself) in the pool.
- if (processingQueue.isEmpty() && executor.getRemainingTasksCount() <= 1) {
- return;
- }
-
- // To achieve maximum efficiency, queue is drained in either of the following 2 conditions:
- //
- // 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU.
- // 2. The process queue size is large.
- if (executor.getRemainingTasksCount() < MIN_PENDING_TASKS
- || processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) {
- drainProcessingQueue();
- }
-
- try {
- // Wait at most {@code SCHEDULING_INTERVAL_MILLISECONDS} milliseconds.
- Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS);
- } catch (InterruptedException e) {
- throw new RuntimeInterruptedException(e);
- }
-
- executor.execute(new Scheduler());
- }
-
- private void drainProcessingQueue() {
- Collection<T> pendingKeysToVisit = new ArrayList<>(processingQueue.size());
- processingQueue.drainTo(pendingKeysToVisit);
- if (pendingKeysToVisit.isEmpty()) {
- return;
- }
-
- for (Task task : getVisitTasks(pendingKeysToVisit)) {
- executor.execute(task);
- }
- }
- }
-
abstract static class Task implements Runnable {
@Override
@@ -598,6 +541,75 @@ class ParallelSkyQueryUtils {
processResultantTargets(keysToUseForResult, callback);
}
}
+
+ /**
+ * A custom implementation of {@link QuiescingExecutor} which uses a centralized queue and
+ * scheduler for parallel BFS visitations.
+ */
+ private class BFSVisitingTaskExecutor extends AbstractQueueVisitor {
+ private BFSVisitingTaskExecutor(ExecutorService executor, ErrorClassifier errorClassifier) {
+ super(
+ /*concurrent=*/ true,
+ /*executorService=*/ executor,
+ // Leave the thread pool active for other current and future callers.
+ /*shutdownOnCompletion=*/ false,
+ /*failFastOnException=*/ true,
+ /*errorClassifier=*/ errorClassifier);
+ }
+
+ private void bfsVisitAndWaitForCompletion() throws QueryException, InterruptedException {
+ // The scheduler keeps running until either of the following two conditions are met.
+ //
+ // 1. Errors (QueryException or InterruptedException) occurred and visitations should fail
+ // fast.
+ // 2. There is no pending visit in the queue and no pending task running.
+ while (!mustJobsBeStopped() && (!processingQueue.isEmpty() || getTaskCount() > 0)) {
+ // To achieve maximum efficiency, queue is drained in either of the following two
+ // conditions:
+ //
+ // 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU.
+ // 2. The process queue size is large.
+ if (getTaskCount() < MIN_PENDING_TASKS
+ || processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) {
+
+ Collection<T> pendingKeysToVisit = new ArrayList<>(processingQueue.size());
+ processingQueue.drainTo(pendingKeysToVisit);
+ for (Task task : getVisitTasks(pendingKeysToVisit)) {
+ execute(task);
+ }
+ }
+
+ try {
+ Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS);
+ } catch (InterruptedException e) {
+ // If the main thread waiting for completion of the visitation is interrupted, we should
+ // gracefully terminate all running and pending tasks before exit. If QueryException
+ // occured in any of the worker thread, awaitTerminationAndPropagateErrorsIfAny
+ // propagates the QueryException instead of InterruptedException.
+ setInterrupted();
+ awaitTerminationAndPropagateErrorsIfAny();
+ throw e;
+ }
+ }
+
+ // We reach here either because the visitation is complete, or because an error prevents us
+ // from proceeding with the visitation. awaitTerminationAndPropagateErrorsIfAny will either
+ // gracefully exit if the visitation is complete, or propagate the exception if error
+ // occurred.
+ awaitTerminationAndPropagateErrorsIfAny();
+ }
+
+ private void awaitTerminationAndPropagateErrorsIfAny()
+ throws QueryException, InterruptedException {
+ try {
+ awaitTermination(/*interruptWorkers=*/ true);
+ } catch (RuntimeQueryException e) {
+ throw (QueryException) e.getCause();
+ } catch (RuntimeInterruptedException e) {
+ throw (InterruptedException) e.getCause();
+ }
+ }
+ }
}
private static class RuntimeQueryException extends RuntimeException {