diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib')
3 files changed, 97 insertions, 82 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java index 45d1c56c15..9bbb2af1eb 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java @@ -108,9 +108,10 @@ public class AbstractQueueVisitor implements QuiescingExecutor { * Flag used to record when all threads were killed by failed action execution. Only ever * transitions from {@code false} to {@code true}. * - * <p>May only be accessed in a block that is synchronized on {@link #zeroRemainingTasks}. + * <p>Except for {@link #mustJobsBeStopped}, may only be accessed in a block that is synchronized + * on {@link #zeroRemainingTasks}. */ - private boolean jobsMustBeStopped = false; + private volatile boolean jobsMustBeStopped = false; /** Map from thread to number of jobs executing in the thread. Used for interrupt handling. */ private final AtomicLongMap<Thread> jobs = AtomicLongMap.create(); @@ -142,7 +143,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { private final ErrorClassifier errorClassifier; - private static final Logger LOG = Logger.getLogger(AbstractQueueVisitor.class.getName()); + private static final Logger logger = Logger.getLogger(AbstractQueueVisitor.class.getName()); /** * Create the {@link AbstractQueueVisitor}. @@ -385,11 +386,6 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } } - @Override - public long getRemainingTasksCount() { - return remainingTasks.get(); - } - /** * Subclasses may override this to make dynamic decisions about whether to run tasks * asynchronously versus in-thread. @@ -415,10 +411,10 @@ public class AbstractQueueVisitor implements QuiescingExecutor { ErrorClassification errorClassification = errorClassifier.classify(e); switch (errorClassification) { case AS_CRITICAL_AS_POSSIBLE: - case CRITICAL_AND_LOG: - critical = true; - LOG.log(Level.WARNING, "Found critical error in queue visitor", e); - break; + case CRITICAL_AND_LOG: + critical = true; + logger.log(Level.WARNING, "Found critical error in queue visitor", e); + break; case CRITICAL: critical = true; break; @@ -524,7 +520,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } /** Set an internal flag to show that an interrupt was detected. */ - private void setInterrupted() { + protected final void setInterrupted() { threadInterrupted = true; } @@ -568,17 +564,27 @@ public class AbstractQueueVisitor implements QuiescingExecutor { * Get number of jobs remaining. Note that this can increase in value if running tasks submit * further jobs. */ - @VisibleForTesting protected final long getTaskCount() { return remainingTasks.get(); } /** - * Waits for the task queue to drain, then shuts down the {@link ExecutorService} and - * waits for it to terminate. Throws (the same) unchecked exception if any - * worker thread failed unexpectedly. + * Whether all running and pending jobs will be stopped or cancelled. Also newly submitted tasks + * will be rejected if this is true. + * + * <p>This function returns the CURRENT state of whether jobs should be stopped. If the value is + * false right now, it may be changed to true by another thread later. + */ + protected final boolean mustJobsBeStopped() { + return jobsMustBeStopped; + } + + /** + * Waits for the task queue to drain. Then if {@code ownExecutorService} is true, shuts down the + * {@link ExecutorService} and waits for it to terminate. Throws (the same) unchecked exception if + * any worker thread failed unexpectedly. */ - private void awaitTermination(boolean interruptWorkers) throws InterruptedException { + protected final void awaitTermination(boolean interruptWorkers) throws InterruptedException { Throwables.propagateIfPossible(catastrophe); try { synchronized (zeroRemainingTasks) { diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java index 78bc93dc10..3424bb4adf 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java @@ -52,9 +52,6 @@ public interface QuiescingExecutor extends Executor { */ void awaitQuiescence(boolean interruptWorkers) throws InterruptedException; - /** Return the number of tasks which are not completed (running or waiting to be executed). */ - long getRemainingTasksCount(); - /** Get latch that is released if a task throws an exception. Used only in tests. */ @VisibleForTesting CountDownLatch getExceptionLatchForTestingOnly(); diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java index 30a27f8e63..95486c6ef9 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java @@ -373,7 +373,7 @@ class ParallelSkyQueryUtils { private final ThreadSafeUniquifier<T> uniquifier; private final Callback<Target> callback; - private final QuiescingExecutor executor; + private final BFSVisitingTaskExecutor executor; /** A queue to store pending visits. */ private final LinkedBlockingQueue<T> processingQueue = new LinkedBlockingQueue<>(); @@ -439,13 +439,8 @@ class ParallelSkyQueryUtils { this.uniquifier = uniquifier; this.callback = callback; this.executor = - new AbstractQueueVisitor( - /*concurrent=*/ true, - /*executorService=*/ FIXED_THREAD_POOL_EXECUTOR, - // Leave the thread pool active for other current and future callers. - /*shutdownOnCompletion=*/ false, - /*failFastOnException=*/ true, - /*errorClassifier=*/ SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER); + new BFSVisitingTaskExecutor( + FIXED_THREAD_POOL_EXECUTOR, SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER); } /** Factory for {@link AbstractSkyKeyBFSVisitor} instances. */ @@ -466,16 +461,7 @@ class ParallelSkyQueryUtils { void visitAndWaitForCompletion(Iterable<SkyKey> keys) throws QueryException, InterruptedException { processingQueue.addAll(ImmutableList.copyOf(preprocessInitialVisit(keys))); - // We add the scheduler to the pool, allowing it (as well as any submitted tasks later) - // to be failed fast if any QueryException or InterruptedException is received. - executor.execute(new Scheduler()); - try { - executor.awaitQuiescence(true); - } catch (RuntimeQueryException e) { - throw (QueryException) e.getCause(); - } catch (RuntimeInterruptedException e) { - throw (InterruptedException) e.getCause(); - } + executor.bfsVisitAndWaitForCompletion(); } /** @@ -502,49 +488,6 @@ class ParallelSkyQueryUtils { return builder.build(); } - private class Scheduler implements Runnable { - @Override - public void run() { - // The scheduler keeps running until both the following two conditions are met. - // - // 1. There is no pending visit in the queue. - // 2. There is no pending task (other than itself) in the pool. - if (processingQueue.isEmpty() && executor.getRemainingTasksCount() <= 1) { - return; - } - - // To achieve maximum efficiency, queue is drained in either of the following 2 conditions: - // - // 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU. - // 2. The process queue size is large. - if (executor.getRemainingTasksCount() < MIN_PENDING_TASKS - || processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) { - drainProcessingQueue(); - } - - try { - // Wait at most {@code SCHEDULING_INTERVAL_MILLISECONDS} milliseconds. - Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS); - } catch (InterruptedException e) { - throw new RuntimeInterruptedException(e); - } - - executor.execute(new Scheduler()); - } - - private void drainProcessingQueue() { - Collection<T> pendingKeysToVisit = new ArrayList<>(processingQueue.size()); - processingQueue.drainTo(pendingKeysToVisit); - if (pendingKeysToVisit.isEmpty()) { - return; - } - - for (Task task : getVisitTasks(pendingKeysToVisit)) { - executor.execute(task); - } - } - } - abstract static class Task implements Runnable { @Override @@ -598,6 +541,75 @@ class ParallelSkyQueryUtils { processResultantTargets(keysToUseForResult, callback); } } + + /** + * A custom implementation of {@link QuiescingExecutor} which uses a centralized queue and + * scheduler for parallel BFS visitations. + */ + private class BFSVisitingTaskExecutor extends AbstractQueueVisitor { + private BFSVisitingTaskExecutor(ExecutorService executor, ErrorClassifier errorClassifier) { + super( + /*concurrent=*/ true, + /*executorService=*/ executor, + // Leave the thread pool active for other current and future callers. + /*shutdownOnCompletion=*/ false, + /*failFastOnException=*/ true, + /*errorClassifier=*/ errorClassifier); + } + + private void bfsVisitAndWaitForCompletion() throws QueryException, InterruptedException { + // The scheduler keeps running until either of the following two conditions are met. + // + // 1. Errors (QueryException or InterruptedException) occurred and visitations should fail + // fast. + // 2. There is no pending visit in the queue and no pending task running. + while (!mustJobsBeStopped() && (!processingQueue.isEmpty() || getTaskCount() > 0)) { + // To achieve maximum efficiency, queue is drained in either of the following two + // conditions: + // + // 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU. + // 2. The process queue size is large. + if (getTaskCount() < MIN_PENDING_TASKS + || processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) { + + Collection<T> pendingKeysToVisit = new ArrayList<>(processingQueue.size()); + processingQueue.drainTo(pendingKeysToVisit); + for (Task task : getVisitTasks(pendingKeysToVisit)) { + execute(task); + } + } + + try { + Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS); + } catch (InterruptedException e) { + // If the main thread waiting for completion of the visitation is interrupted, we should + // gracefully terminate all running and pending tasks before exit. If QueryException + // occured in any of the worker thread, awaitTerminationAndPropagateErrorsIfAny + // propagates the QueryException instead of InterruptedException. + setInterrupted(); + awaitTerminationAndPropagateErrorsIfAny(); + throw e; + } + } + + // We reach here either because the visitation is complete, or because an error prevents us + // from proceeding with the visitation. awaitTerminationAndPropagateErrorsIfAny will either + // gracefully exit if the visitation is complete, or propagate the exception if error + // occurred. + awaitTerminationAndPropagateErrorsIfAny(); + } + + private void awaitTerminationAndPropagateErrorsIfAny() + throws QueryException, InterruptedException { + try { + awaitTermination(/*interruptWorkers=*/ true); + } catch (RuntimeQueryException e) { + throw (QueryException) e.getCause(); + } catch (RuntimeInterruptedException e) { + throw (InterruptedException) e.getCause(); + } + } + } } private static class RuntimeQueryException extends RuntimeException { |