diff options
author | 2017-05-16 05:25:49 +0200 | |
---|---|---|
committer | 2017-05-16 15:24:46 +0200 | |
commit | 7184b6f55a8cb72094a481d056fe89bc7be80c76 (patch) | |
tree | 1378cc258d8874356baac702c2b5960d90178588 /src | |
parent | 23004957a56d304106d5dbf0d6ea52ec10713b39 (diff) |
Have TransitiveTraversalValues store kind of targets which have errors when computing TransitiveTraversalValues.
RELNOTES: None
PiperOrigin-RevId: 156138657
Diffstat (limited to 'src')
5 files changed, 382 insertions, 328 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java index 07249b50fb..28cdede50e 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java @@ -24,16 +24,10 @@ import com.google.common.collect.Iterables; import com.google.common.collect.ListMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.collect.CompactHashSet; -import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; -import com.google.devtools.build.lib.concurrent.BlockingStack; -import com.google.devtools.build.lib.concurrent.ErrorClassifier; import com.google.devtools.build.lib.concurrent.MultisetSemaphore; -import com.google.devtools.build.lib.concurrent.QuiescingExecutor; -import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.query2.engine.Callback; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; @@ -51,10 +45,6 @@ import java.util.Collection; import java.util.LinkedList; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; /** * Parallel implementations of various functionality in {@link SkyQueryEnvironment}. @@ -86,7 +76,7 @@ class ParallelSkyQueryUtils { return env.eval( expression, context, - new SkyKeyBFSVisitorCallback( + ParallelVisitor.createParallelVisitorCallback( new AllRdepsUnboundedVisitor.Factory(env, callback, packageSemaphore))); } @@ -104,7 +94,8 @@ class ParallelSkyQueryUtils { } /** A helper class that computes 'rbuildfiles(<blah>)' via BFS. */ - private static class RBuildFilesVisitor extends AbstractSkyKeyBFSVisitor<SkyKey> { + private static class RBuildFilesVisitor extends ParallelVisitor<SkyKey> { + private final SkyQueryEnvironment env; private final MultisetSemaphore<PackageIdentifier> packageSemaphore; private RBuildFilesVisitor( @@ -112,7 +103,8 @@ class ParallelSkyQueryUtils { Uniquifier<SkyKey> uniquifier, Callback<Target> callback, MultisetSemaphore<PackageIdentifier> packageSemaphore) { - super(env, uniquifier, callback); + super(uniquifier, callback, VISIT_BATCH_SIZE); + this.env = env; this.packageSemaphore = packageSemaphore; } @@ -173,8 +165,8 @@ class ParallelSkyQueryUtils { * even with 10M edges, the memory overhead is around 160M, and the memory can be reclaimed by * regular GC. */ - private static class AllRdepsUnboundedVisitor - extends AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> { + private static class AllRdepsUnboundedVisitor extends ParallelVisitor<Pair<SkyKey, SkyKey>> { + private final SkyQueryEnvironment env; private final MultisetSemaphore<PackageIdentifier> packageSemaphore; private AllRdepsUnboundedVisitor( @@ -182,7 +174,8 @@ class ParallelSkyQueryUtils { Uniquifier<Pair<SkyKey, SkyKey>> uniquifier, Callback<Target> callback, MultisetSemaphore<PackageIdentifier> packageSemaphore) { - super(env, uniquifier, callback); + super(uniquifier, callback, VISIT_BATCH_SIZE); + this.env = env; this.packageSemaphore = packageSemaphore; } @@ -192,7 +185,7 @@ class ParallelSkyQueryUtils { * single {@link Callback#process} call. Note that all the created instances share the same * {@link Uniquifier} so that we don't visit the same Skyframe node more than once. */ - private static class Factory implements AbstractSkyKeyBFSVisitor.Factory { + private static class Factory implements ParallelVisitor.Factory { private final SkyQueryEnvironment env; private final Uniquifier<Pair<SkyKey, SkyKey>> uniquifier; private final Callback<Target> callback; @@ -209,7 +202,7 @@ class ParallelSkyQueryUtils { } @Override - public AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> create() { + public ParallelVisitor<Pair<SkyKey, SkyKey>> create() { return new AllRdepsUnboundedVisitor(env, uniquifier, callback, packageSemaphore); } } @@ -337,290 +330,5 @@ class ParallelSkyQueryUtils { return builder.build(); } } - - /** - * A {@link Callback} whose {@link Callback#process} method kicks off a BFS visitation via a fresh - * {@link AbstractSkyKeyBFSVisitor} instance. - */ - private static class SkyKeyBFSVisitorCallback implements Callback<Target> { - private final AbstractSkyKeyBFSVisitor.Factory visitorFactory; - - private SkyKeyBFSVisitorCallback(AbstractSkyKeyBFSVisitor.Factory visitorFactory) { - this.visitorFactory = visitorFactory; - } - - @Override - public void process(Iterable<Target> partialResult) - throws QueryException, InterruptedException { - AbstractSkyKeyBFSVisitor<?> visitor = visitorFactory.create(); - // TODO(nharmata): It's not ideal to have an operation like this in #process that blocks on - // another, potentially expensive computation. Refactor to something like "processAsync". - visitor.visitAndWaitForCompletion( - SkyQueryEnvironment.makeTransitiveTraversalKeysStrict(partialResult)); - } - } - - /** - * A helper class for performing a custom BFS visitation on the Skyframe graph, using {@link - * QuiescingExecutor}. - * - * <p>The visitor uses an AbstractQueueVisitor backed by a ThreadPoolExecutor with a thread pool - * NOT part of the global query evaluation pool to avoid starvation. - */ - @ThreadSafe - private abstract static class AbstractSkyKeyBFSVisitor<T> { - protected final SkyQueryEnvironment env; - private final Uniquifier<T> uniquifier; - private final Callback<Target> callback; - - private final BFSVisitingTaskExecutor executor; - - /** A queue to store pending visits. */ - private final LinkedBlockingQueue<T> processingQueue = new LinkedBlockingQueue<>(); - - /** - * The max time interval between two scheduling passes in milliseconds. A scheduling pass is - * defined as the scheduler thread determining whether to drain all pending visits from the - * queue and submitting tasks to perform the visits. - * - * <p>The choice of 1ms is a result based of experiments. It is an attempted balance due to a - * few facts about the scheduling interval: - * - * <p>1. A large interval adds systematic delay. In an extreme case, a BFS visit which is - * supposed to take only 1ms now may take 5ms. For most BFS visits which take longer than a few - * hundred milliseconds, it should not be noticeable. - * - * <p>2. A zero-interval config eats too much CPU. - * - * <p>Even though the scheduler runs once every 1 ms, it does not try to drain it every time. - * Pending visits are drained only certain criteria are met. - */ - private static final long SCHEDULING_INTERVAL_MILLISECONDS = 1; - - /** - * The minimum number of pending tasks the scheduler tries to hit. The 3x number is set based on - * experiments. We do not want to schedule tasks too frequently to miss the benefits of large - * number of keys being grouped by packages. On the other hand, we want to keep all threads in - * the pool busy to achieve full capacity. A low number here will cause some of the worker - * threads to go idle at times before the next scheduling cycle. - * - * <p>TODO(shazh): Revisit the choice of task target based on real-prod performance. - */ - private static final long MIN_PENDING_TASKS = 3 * SkyQueryEnvironment.DEFAULT_THREAD_COUNT; - - /** - * Fail fast on RuntimeExceptions, including {code RuntimeInterruptedException} and {@code - * RuntimeQueryException}, which are resulted from InterruptedException and QueryException. - */ - static final ErrorClassifier SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER = - new ErrorClassifier() { - @Override - protected ErrorClassification classifyException(Exception e) { - return (e instanceof RuntimeException) - ? ErrorClassification.CRITICAL_AND_LOG - : ErrorClassification.NOT_CRITICAL; - } - }; - - /** All BFS visitors share a single global fixed thread pool. */ - private static final ExecutorService FIXED_THREAD_POOL_EXECUTOR = - new ThreadPoolExecutor( - // Must be at least 2 worker threads in the pool (1 for the scheduler thread). - /*corePoolSize=*/ Math.max(2, SkyQueryEnvironment.DEFAULT_THREAD_COUNT), - /*maximumPoolSize=*/ Math.max(2, SkyQueryEnvironment.DEFAULT_THREAD_COUNT), - /*keepAliveTime=*/ 1, - /*units=*/ TimeUnit.SECONDS, - /*workQueue=*/ new BlockingStack<Runnable>(), - new ThreadFactoryBuilder().setNameFormat("skykey-bfs-visitor %d").build()); - - private AbstractSkyKeyBFSVisitor( - SkyQueryEnvironment env, Uniquifier<T> uniquifier, Callback<Target> callback) { - this.env = env; - this.uniquifier = uniquifier; - this.callback = callback; - this.executor = - new BFSVisitingTaskExecutor( - FIXED_THREAD_POOL_EXECUTOR, SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER); - } - - /** Factory for {@link AbstractSkyKeyBFSVisitor} instances. */ - private static interface Factory { - AbstractSkyKeyBFSVisitor<?> create(); - } - - protected final class Visit { - private final Iterable<SkyKey> keysToUseForResult; - private final Iterable<T> keysToVisit; - - private Visit(Iterable<SkyKey> keysToUseForResult, Iterable<T> keysToVisit) { - this.keysToUseForResult = keysToUseForResult; - this.keysToVisit = keysToVisit; - } - } - - void visitAndWaitForCompletion(Iterable<SkyKey> keys) - throws QueryException, InterruptedException { - processingQueue.addAll(ImmutableList.copyOf(preprocessInitialVisit(keys))); - executor.bfsVisitAndWaitForCompletion(); - } - - /** - * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s in - * the full visitation to the given {@link Callback}. - */ - protected abstract void processResultantTargets( - Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) - throws QueryException, InterruptedException; - - /** Gets the {@link Visit} representing the local visitation of the given {@code values}. */ - protected abstract Visit getVisitResult(Iterable<T> values) throws InterruptedException; - - /** Gets the first {@link Visit} representing the entry-level SkyKeys. */ - protected abstract Iterable<T> preprocessInitialVisit(Iterable<SkyKey> keys); - - protected Iterable<Task> getVisitTasks(Collection<T> pendingKeysToVisit) { - ImmutableList.Builder<Task> builder = ImmutableList.builder(); - for (Iterable<T> keysToVisitBatch : - Iterables.partition(pendingKeysToVisit, VISIT_BATCH_SIZE)) { - builder.add(new VisitTask(keysToVisitBatch)); - } - - return builder.build(); - } - - abstract static class Task implements Runnable { - - @Override - public void run() { - try { - process(); - } catch (QueryException e) { - throw new RuntimeQueryException(e); - } catch (InterruptedException e) { - throw new RuntimeInterruptedException(e); - } - } - - abstract void process() throws QueryException, InterruptedException; - } - - class VisitTask extends Task { - private final Iterable<T> keysToVisit; - - private VisitTask(Iterable<T> keysToVisit) { - this.keysToVisit = keysToVisit; - } - - @Override - void process() throws InterruptedException { - ImmutableList<T> uniqueKeys = uniquifier.unique(keysToVisit); - if (uniqueKeys.isEmpty()) { - return; - } - - Visit visit = getVisitResult(uniqueKeys); - for (Iterable<SkyKey> keysToUseForResultBatch : - Iterables.partition( - visit.keysToUseForResult, SkyQueryEnvironment.BATCH_CALLBACK_SIZE)) { - executor.execute(new GetAndProcessResultsTask(keysToUseForResultBatch)); - } - - processingQueue.addAll(ImmutableList.copyOf(visit.keysToVisit)); - } - } - - private class GetAndProcessResultsTask extends Task { - private final Iterable<SkyKey> keysToUseForResult; - - private GetAndProcessResultsTask(Iterable<SkyKey> keysToUseForResult) { - this.keysToUseForResult = keysToUseForResult; - } - - @Override - protected void process() throws QueryException, InterruptedException { - processResultantTargets(keysToUseForResult, callback); - } - } - - /** - * A custom implementation of {@link QuiescingExecutor} which uses a centralized queue and - * scheduler for parallel BFS visitations. - */ - private class BFSVisitingTaskExecutor extends AbstractQueueVisitor { - private BFSVisitingTaskExecutor(ExecutorService executor, ErrorClassifier errorClassifier) { - super( - /*executorService=*/ executor, - // Leave the thread pool active for other current and future callers. - /*shutdownOnCompletion=*/ false, - /*failFastOnException=*/ true, - /*errorClassifier=*/ errorClassifier); - } - - private void bfsVisitAndWaitForCompletion() throws QueryException, InterruptedException { - // The scheduler keeps running until either of the following two conditions are met. - // - // 1. Errors (QueryException or InterruptedException) occurred and visitations should fail - // fast. - // 2. There is no pending visit in the queue and no pending task running. - while (!mustJobsBeStopped() && (!processingQueue.isEmpty() || getTaskCount() > 0)) { - // To achieve maximum efficiency, queue is drained in either of the following two - // conditions: - // - // 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU. - // 2. The process queue size is large. - if (getTaskCount() < MIN_PENDING_TASKS - || processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) { - - Collection<T> pendingKeysToVisit = new ArrayList<>(processingQueue.size()); - processingQueue.drainTo(pendingKeysToVisit); - for (Task task : getVisitTasks(pendingKeysToVisit)) { - execute(task); - } - } - - try { - Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS); - } catch (InterruptedException e) { - // If the main thread waiting for completion of the visitation is interrupted, we should - // gracefully terminate all running and pending tasks before exit. If QueryException - // occured in any of the worker thread, awaitTerminationAndPropagateErrorsIfAny - // propagates the QueryException instead of InterruptedException. - setInterrupted(); - awaitTerminationAndPropagateErrorsIfAny(); - throw e; - } - } - - // We reach here either because the visitation is complete, or because an error prevents us - // from proceeding with the visitation. awaitTerminationAndPropagateErrorsIfAny will either - // gracefully exit if the visitation is complete, or propagate the exception if error - // occurred. - awaitTerminationAndPropagateErrorsIfAny(); - } - - private void awaitTerminationAndPropagateErrorsIfAny() - throws QueryException, InterruptedException { - try { - awaitTermination(/*interruptWorkers=*/ true); - } catch (RuntimeQueryException e) { - throw (QueryException) e.getCause(); - } catch (RuntimeInterruptedException e) { - throw (InterruptedException) e.getCause(); - } - } - } - } - - private static class RuntimeQueryException extends RuntimeException { - private RuntimeQueryException(QueryException queryException) { - super(queryException); - } - } - - private static class RuntimeInterruptedException extends RuntimeException { - private RuntimeInterruptedException(InterruptedException interruptedException) { - super(interruptedException); - } - } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java new file mode 100644 index 0000000000..73e291e118 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java @@ -0,0 +1,326 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.query2; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; +import com.google.devtools.build.lib.concurrent.BlockingStack; +import com.google.devtools.build.lib.concurrent.ErrorClassifier; +import com.google.devtools.build.lib.concurrent.QuiescingExecutor; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; +import com.google.devtools.build.lib.packages.Target; +import com.google.devtools.build.lib.query2.engine.Callback; +import com.google.devtools.build.lib.query2.engine.QueryException; +import com.google.devtools.build.lib.query2.engine.Uniquifier; +import com.google.devtools.build.skyframe.SkyKey; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * A helper class for performing a custom visitation on the Skyframe graph, using {@link + * QuiescingExecutor}. + * + * <p>The visitor uses an AbstractQueueVisitor backed by a ThreadPoolExecutor with a thread pool NOT + * part of the global query evaluation pool to avoid starvation. + */ +@ThreadSafe +public abstract class ParallelVisitor<T> { + private final Uniquifier<T> uniquifier; + private final Callback<Target> callback; + private final int visitBatchSize; + + private final VisitingTaskExecutor executor; + + /** A queue to store pending visits. */ + private final LinkedBlockingQueue<T> processingQueue = new LinkedBlockingQueue<>(); + + /** + * The max time interval between two scheduling passes in milliseconds. A scheduling pass is + * defined as the scheduler thread determining whether to drain all pending visits from the queue + * and submitting tasks to perform the visits. + * + * <p>The choice of 1ms is a result based of experiments. It is an attempted balance due to a few + * facts about the scheduling interval: + * + * <p>1. A large interval adds systematic delay. In an extreme case, a visit which is supposed to + * take only 1ms now may take 5ms. For most visits which take longer than a few hundred + * milliseconds, it should not be noticeable. + * + * <p>2. A zero-interval config eats too much CPU. + * + * <p>Even though the scheduler runs once every 1 ms, it does not try to drain it every time. + * Pending visits are drained only certain criteria are met. + */ + private static final long SCHEDULING_INTERVAL_MILLISECONDS = 1; + + /** + * The minimum number of pending tasks the scheduler tries to hit. The 3x number is set based on + * experiments. We do not want to schedule tasks too frequently to miss the benefits of large + * number of keys being grouped by packages. On the other hand, we want to keep all threads in the + * pool busy to achieve full capacity. A low number here will cause some of the worker threads to + * go idle at times before the next scheduling cycle. + * + * <p>TODO(shazh): Revisit the choice of task target based on real-prod performance. + */ + private static final long MIN_PENDING_TASKS = 3 * SkyQueryEnvironment.DEFAULT_THREAD_COUNT; + + /** + * Fail fast on RuntimeExceptions, including {code RuntimeInterruptedException} and {@code + * RuntimeQueryException}, which are resulted from InterruptedException and QueryException. + */ + static final ErrorClassifier PARALLEL_VISITOR_ERROR_CLASSIFIER = + new ErrorClassifier() { + @Override + protected ErrorClassification classifyException(Exception e) { + return (e instanceof RuntimeException) + ? ErrorClassification.CRITICAL_AND_LOG + : ErrorClassification.NOT_CRITICAL; + } + }; + + /** All visitors share a single global fixed thread pool. */ + private static final ExecutorService FIXED_THREAD_POOL_EXECUTOR = + new ThreadPoolExecutor( + /*corePoolSize=*/ Math.max(1, SkyQueryEnvironment.DEFAULT_THREAD_COUNT), + /*maximumPoolSize=*/ Math.max(1, SkyQueryEnvironment.DEFAULT_THREAD_COUNT), + /*keepAliveTime=*/ 1, + /*units=*/ TimeUnit.SECONDS, + /*workQueue=*/ new BlockingStack<Runnable>(), + new ThreadFactoryBuilder().setNameFormat("parallel-visitor %d").build()); + + protected ParallelVisitor( + Uniquifier<T> uniquifier, Callback<Target> callback, int visitBatchSize) { + this.uniquifier = uniquifier; + this.callback = callback; + this.visitBatchSize = visitBatchSize; + this.executor = + new VisitingTaskExecutor(FIXED_THREAD_POOL_EXECUTOR, PARALLEL_VISITOR_ERROR_CLASSIFIER); + } + + /** Factory for {@link ParallelVisitor} instances. */ + public static interface Factory { + ParallelVisitor<?> create(); + } + + /** + * Returns a {@link Callback} which kicks off a parallel visitation when {@link Callback#process} + * is invoked. + */ + public static Callback<Target> createParallelVisitorCallback(Factory visitorFactory) { + return new ParallelVisitorCallback(visitorFactory); + } + + /** An object to hold keys to visit and keys ready for processing. */ + protected final class Visit { + private final Iterable<SkyKey> keysToUseForResult; + private final Iterable<T> keysToVisit; + + public Visit(Iterable<SkyKey> keysToUseForResult, Iterable<T> keysToVisit) { + this.keysToUseForResult = keysToUseForResult; + this.keysToVisit = keysToVisit; + } + } + + void visitAndWaitForCompletion(Iterable<SkyKey> keys) + throws QueryException, InterruptedException { + processingQueue.addAll(ImmutableList.copyOf(preprocessInitialVisit(keys))); + executor.visitAndWaitForCompletion(); + } + + /** + * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s in + * the full visitation to the given {@link Callback}. + */ + protected abstract void processResultantTargets( + Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) + throws QueryException, InterruptedException; + + /** Gets the {@link Visit} representing the local visitation of the given {@code values}. */ + protected abstract Visit getVisitResult(Iterable<T> values) throws InterruptedException; + + /** Gets the first {@link Visit} representing the entry-level SkyKeys. */ + protected abstract Iterable<T> preprocessInitialVisit(Iterable<SkyKey> keys); + + /** Gets tasks to visit pending keys. */ + protected Iterable<Task> getVisitTasks(Collection<T> pendingKeysToVisit) { + ImmutableList.Builder<Task> builder = ImmutableList.builder(); + for (Iterable<T> keysToVisitBatch : Iterables.partition(pendingKeysToVisit, visitBatchSize)) { + builder.add(new VisitTask(keysToVisitBatch)); + } + + return builder.build(); + } + + /** A {@link Runnable} which handles {@link QueryException} and {@link InterruptedException}. */ + protected abstract static class Task implements Runnable { + + @Override + public void run() { + try { + process(); + } catch (QueryException e) { + throw new RuntimeQueryException(e); + } catch (InterruptedException e) { + throw new RuntimeInterruptedException(e); + } + } + + abstract void process() throws QueryException, InterruptedException; + } + + class VisitTask extends Task { + private final Iterable<T> keysToVisit; + + VisitTask(Iterable<T> keysToVisit) { + this.keysToVisit = keysToVisit; + } + + @Override + void process() throws InterruptedException { + ImmutableList<T> uniqueKeys = uniquifier.unique(keysToVisit); + if (uniqueKeys.isEmpty()) { + return; + } + + Visit visit = getVisitResult(uniqueKeys); + for (Iterable<SkyKey> keysToUseForResultBatch : + Iterables.partition(visit.keysToUseForResult, SkyQueryEnvironment.BATCH_CALLBACK_SIZE)) { + executor.execute(new GetAndProcessResultsTask(keysToUseForResultBatch)); + } + + processingQueue.addAll(ImmutableList.copyOf(visit.keysToVisit)); + } + } + + private class GetAndProcessResultsTask extends Task { + private final Iterable<SkyKey> keysToUseForResult; + + private GetAndProcessResultsTask(Iterable<SkyKey> keysToUseForResult) { + this.keysToUseForResult = keysToUseForResult; + } + + @Override + protected void process() throws QueryException, InterruptedException { + processResultantTargets(keysToUseForResult, callback); + } + } + + /** + * A custom implementation of {@link QuiescingExecutor} which uses a centralized queue and + * scheduler for parallel visitations. + */ + private class VisitingTaskExecutor extends AbstractQueueVisitor { + private VisitingTaskExecutor(ExecutorService executor, ErrorClassifier errorClassifier) { + super( + /*executorService=*/ executor, + // Leave the thread pool active for other current and future callers. + /*shutdownOnCompletion=*/ false, + /*failFastOnException=*/ true, + /*errorClassifier=*/ errorClassifier); + } + + private void visitAndWaitForCompletion() throws QueryException, InterruptedException { + // The scheduler keeps running until either of the following two conditions are met. + // + // 1. Errors (QueryException or InterruptedException) occurred and visitations should fail + // fast. + // 2. There is no pending visit in the queue and no pending task running. + while (!mustJobsBeStopped() && (!processingQueue.isEmpty() || getTaskCount() > 0)) { + // To achieve maximum efficiency, queue is drained in either of the following two + // conditions: + // + // 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU. + // 2. The process queue size is large. + if (getTaskCount() < MIN_PENDING_TASKS + || processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) { + + Collection<T> pendingKeysToVisit = new ArrayList<>(processingQueue.size()); + processingQueue.drainTo(pendingKeysToVisit); + for (Task task : getVisitTasks(pendingKeysToVisit)) { + execute(task); + } + } + + try { + Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS); + } catch (InterruptedException e) { + // If the main thread waiting for completion of the visitation is interrupted, we should + // gracefully terminate all running and pending tasks before exit. If QueryException + // occured in any of the worker thread, awaitTerminationAndPropagateErrorsIfAny + // propagates the QueryException instead of InterruptedException. + setInterrupted(); + awaitTerminationAndPropagateErrorsIfAny(); + throw e; + } + } + + // We reach here either because the visitation is complete, or because an error prevents us + // from proceeding with the visitation. awaitTerminationAndPropagateErrorsIfAny will either + // gracefully exit if the visitation is complete, or propagate the exception if error + // occurred. + awaitTerminationAndPropagateErrorsIfAny(); + } + + private void awaitTerminationAndPropagateErrorsIfAny() + throws QueryException, InterruptedException { + try { + awaitTermination(/*interruptWorkers=*/ true); + } catch (RuntimeQueryException e) { + throw (QueryException) e.getCause(); + } catch (RuntimeInterruptedException e) { + throw (InterruptedException) e.getCause(); + } + } + } + + /** + * A {@link Callback} whose {@link Callback#process} method kicks off a visitation via a fresh + * {@link ParallelVisitor} instance. + */ + private static class ParallelVisitorCallback implements Callback<Target> { + private final ParallelVisitor.Factory visitorFactory; + + private ParallelVisitorCallback(ParallelVisitor.Factory visitorFactory) { + this.visitorFactory = visitorFactory; + } + + @Override + public void process(Iterable<Target> partialResult) + throws QueryException, InterruptedException { + ParallelVisitor<?> visitor = visitorFactory.create(); + // TODO(nharmata): It's not ideal to have an operation like this in #process that blocks on + // another, potentially expensive computation. Refactor to something like "processAsync". + visitor.visitAndWaitForCompletion( + SkyQueryEnvironment.makeTransitiveTraversalKeysStrict(partialResult)); + } + } + + private static class RuntimeQueryException extends RuntimeException { + private RuntimeQueryException(QueryException queryException) { + super(queryException); + } + } + + private static class RuntimeInterruptedException extends RuntimeException { + private RuntimeInterruptedException(InterruptedException interruptedException) { + super(interruptedException); + } + } +} diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index 631afcb002..551844e962 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -669,7 +669,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> } @ThreadSafe - Uniquifier<SkyKey> createSkyKeyUniquifier() { + protected Uniquifier<SkyKey> createSkyKeyUniquifier() { return new UniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT); } diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalFunction.java b/src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalFunction.java index 4f11095490..76730690b8 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalFunction.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalFunction.java @@ -81,8 +81,13 @@ public class TransitiveTraversalFunction } } - protected Collection<Label> getAspectLabels(Rule fromRule, Attribute attr, Label toLabel, - ValueOrException2<NoSuchPackageException, NoSuchTargetException> toVal, Environment env) { + @Override + protected Collection<Label> getAspectLabels( + Rule fromRule, + Attribute attr, + Label toLabel, + ValueOrException2<NoSuchPackageException, NoSuchTargetException> toVal, + Environment env) { try { if (toVal == null) { return ImmutableList.of(); @@ -110,7 +115,8 @@ public class TransitiveTraversalFunction String firstErrorMessage = accumulator.getFirstErrorMessage(); return targetLoadedSuccessfully ? TransitiveTraversalValue.forTarget(targetAndErrorIfAny.getTarget(), firstErrorMessage) - : TransitiveTraversalValue.unsuccessfulTransitiveTraversal(firstErrorMessage); + : TransitiveTraversalValue.unsuccessfulTransitiveTraversal( + firstErrorMessage, targetAndErrorIfAny.getTarget()); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalValue.java b/src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalValue.java index afc26d3568..4639868b76 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalValue.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalValue.java @@ -13,6 +13,7 @@ // limitations under the License. package com.google.devtools.build.lib.skyframe; +import com.google.common.base.MoreObjects; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.concurrent.ThreadSafety.Immutable; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; @@ -52,8 +53,16 @@ public abstract class TransitiveTraversalValue implements SkyValue { private static final InternerWithPresenceCheck<TransitiveTraversalValue> VALUE_INTERNER = new InternerWithPresenceCheck<>(); - static TransitiveTraversalValue unsuccessfulTransitiveTraversal(String firstErrorMessage) { - return new TransitiveTraversalValueWithError(Preconditions.checkNotNull(firstErrorMessage)); + private final String kind; + + protected TransitiveTraversalValue(String kind) { + this.kind = Preconditions.checkNotNull(kind); + } + + static TransitiveTraversalValue unsuccessfulTransitiveTraversal( + String firstErrorMessage, Target target) { + return new TransitiveTraversalValueWithError( + Preconditions.checkNotNull(firstErrorMessage), target.getTargetKind()); } static TransitiveTraversalValue forTarget(Target target, @Nullable String firstErrorMessage) { @@ -84,16 +93,16 @@ public abstract class TransitiveTraversalValue implements SkyValue { return value; } } else { - return new TransitiveTraversalValueWithError(firstErrorMessage); + return new TransitiveTraversalValueWithError(firstErrorMessage, target.getTargetKind()); } } public static TransitiveTraversalValue create( - AdvertisedProviderSet providers, @Nullable String kind, @Nullable String firstErrorMessage) { + AdvertisedProviderSet providers, String kind, @Nullable String firstErrorMessage) { TransitiveTraversalValue value = firstErrorMessage == null ? new TransitiveTraversalValueWithoutError(providers, kind) - : new TransitiveTraversalValueWithError(firstErrorMessage); + : new TransitiveTraversalValueWithError(firstErrorMessage, kind); if (firstErrorMessage == null) { TransitiveTraversalValue oldValue = VALUE_INTERNER.getCanonical(value); return oldValue == null ? value : oldValue; @@ -110,9 +119,10 @@ public abstract class TransitiveTraversalValue implements SkyValue { */ public abstract AdvertisedProviderSet getProviders(); - /** Returns the target kind, if any. */ - @Nullable - public abstract String getKind(); + /** Returns the target kind. */ + public String getKind() { + return kind; + } /** * Returns the first error message, if any, from loading the target and its transitive @@ -149,12 +159,11 @@ public abstract class TransitiveTraversalValue implements SkyValue { /** A transitive target reference without error. */ public static final class TransitiveTraversalValueWithoutError extends TransitiveTraversalValue { private final AdvertisedProviderSet advertisedProviders; - @Nullable private final String kind; private TransitiveTraversalValueWithoutError( AdvertisedProviderSet providers, @Nullable String kind) { + super(kind); this.advertisedProviders = Preconditions.checkNotNull(providers); - this.kind = kind; } @Override @@ -169,14 +178,16 @@ public abstract class TransitiveTraversalValue implements SkyValue { @Override @Nullable - public String getKind() { - return kind; + public String getFirstErrorMessage() { + return null; } @Override - @Nullable - public String getFirstErrorMessage() { - return null; + public String toString() { + return MoreObjects.toStringHelper(this) + .add("kind", getKind()) + .add("providers", advertisedProviders) + .toString(); } } @@ -184,7 +195,8 @@ public abstract class TransitiveTraversalValue implements SkyValue { public static final class TransitiveTraversalValueWithError extends TransitiveTraversalValue { private final String firstErrorMessage; - private TransitiveTraversalValueWithError(String firstErrorMessage) { + private TransitiveTraversalValueWithError(String firstErrorMessage, String kind) { + super(kind); this.firstErrorMessage = StringCanonicalizer.intern(Preconditions.checkNotNull(firstErrorMessage)); } @@ -201,14 +213,16 @@ public abstract class TransitiveTraversalValue implements SkyValue { @Override @Nullable - public String getKind() { - return null; + public String getFirstErrorMessage() { + return firstErrorMessage; } @Override - @Nullable - public String getFirstErrorMessage() { - return firstErrorMessage; + public String toString() { + return MoreObjects.toStringHelper(this) + .add("error", firstErrorMessage) + .add("kind", getKind()) + .toString(); } } } |