diff options
author | Googler <noreply@google.com> | 2016-11-28 21:54:43 +0000 |
---|---|---|
committer | Irina Iancu <elenairina@google.com> | 2016-11-29 08:07:08 +0000 |
commit | 2b5038831c2514f65841ce4e40f8e4648250bf01 (patch) | |
tree | 7f3cda740abdad84307803c3fe5243697f6f3479 | |
parent | 47f48d2d500e5aa58fbbea8f163067653b0127f1 (diff) |
Update ParallelSkyQueryUtils to use QuiescingExecutor instead of ForkJoinPool
for concurrent visitations.
During BFS visitation of rdeps and rbuildfiles, it uses a centralized pool
(backed by a LinkedBlockingQueue) to store all pending visits, and a
periodically running scheduler to schedule tasks for each pending visit.
--
MOS_MIGRATED_REVID=140398162
5 files changed, 242 insertions, 130 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java index 01cdca1d3e..45d1c56c15 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java @@ -385,6 +385,11 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } } + @Override + public long getRemainingTasksCount() { + return remainingTasks.get(); + } + /** * Subclasses may override this to make dynamic decisions about whether to run tasks * asynchronously versus in-thread. diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/BlockingStack.java b/src/main/java/com/google/devtools/build/lib/concurrent/BlockingStack.java index f409bc2d73..93fc3c99f0 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/BlockingStack.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/BlockingStack.java @@ -22,11 +22,11 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; /** A {@link BlockingQueue} with LIFO (last-in-first-out) ordering. */ -class BlockingStack<E> extends AbstractQueue<E> implements BlockingQueue<E> { +public class BlockingStack<E> extends AbstractQueue<E> implements BlockingQueue<E> { // We just restrict to only using the *First methods on the deque, turning it into a stack. private final BlockingDeque<E> deque; - BlockingStack() { + public BlockingStack() { this.deque = new LinkedBlockingDeque<>(); } diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java index 65718c6205..78bc93dc10 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java @@ -14,7 +14,6 @@ package com.google.devtools.build.lib.concurrent; import com.google.common.annotations.VisibleForTesting; - import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -53,6 +52,9 @@ public interface QuiescingExecutor extends Executor { */ void awaitQuiescence(boolean interruptWorkers) throws InterruptedException; + /** Return the number of tasks which are not completed (running or waiting to be executed). */ + long getRemainingTasksCount(); + /** Get latch that is released if a task throws an exception. Used only in tests. */ @VisibleForTesting CountDownLatch getExceptionLatchForTestingOnly(); diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java index 5f7447b198..086ecfc1b8 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java @@ -13,6 +13,7 @@ // limitations under the License. package com.google.devtools.build.lib.query2; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; @@ -22,11 +23,15 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.collect.CompactHashSet; -import com.google.devtools.build.lib.concurrent.MoreFutures; +import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; +import com.google.devtools.build.lib.concurrent.BlockingStack; +import com.google.devtools.build.lib.concurrent.ErrorClassifier; import com.google.devtools.build.lib.concurrent.MultisetSemaphore; +import com.google.devtools.build.lib.concurrent.QuiescingExecutor; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.query2.engine.Callback; @@ -45,10 +50,10 @@ import java.util.Collection; import java.util.LinkedList; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ForkJoinTask; -import java.util.concurrent.RecursiveAction; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Parallel implementations of various functionality in {@link SkyQueryEnvironment}. @@ -60,6 +65,10 @@ import java.util.concurrent.RecursiveAction; */ // TODO(bazel-team): Be more deliberate about bounding memory usage here. class ParallelSkyQueryUtils { + + /** The maximum number of keys to visit at once. */ + @VisibleForTesting static final int VISIT_BATCH_SIZE = 10000; + private ParallelSkyQueryUtils() { } @@ -72,14 +81,13 @@ class ParallelSkyQueryUtils { QueryExpression expression, VariableContext<Target> context, ThreadSafeCallback<Target> callback, - ForkJoinPool forkJoinPool, MultisetSemaphore<PackageIdentifier> packageSemaphore) throws QueryException, InterruptedException { env.eval( expression, context, new SkyKeyBFSVisitorCallback( - new AllRdepsUnboundedVisitor.Factory(env, callback, forkJoinPool, packageSemaphore))); + new AllRdepsUnboundedVisitor.Factory(env, callback, packageSemaphore))); } /** Specialized parallel variant of {@link SkyQueryEnvironment#getRBuildFiles}. */ @@ -87,28 +95,24 @@ class ParallelSkyQueryUtils { SkyQueryEnvironment env, Collection<PathFragment> fileIdentifiers, ThreadSafeCallback<Target> callback, - ForkJoinPool forkJoinPool, MultisetSemaphore<PackageIdentifier> packageSemaphore) throws QueryException, InterruptedException { ThreadSafeUniquifier<SkyKey> keyUniquifier = env.createSkyKeyUniquifier(); RBuildFilesVisitor visitor = - new RBuildFilesVisitor(env, forkJoinPool, keyUniquifier, callback, packageSemaphore); + new RBuildFilesVisitor(env, keyUniquifier, callback, packageSemaphore); visitor.visitAndWaitForCompletion(env.getSkyKeysForFileFragments(fileIdentifiers)); } /** A helper class that computes 'rbuildfiles(<blah>)' via BFS. */ private static class RBuildFilesVisitor extends AbstractSkyKeyBFSVisitor<SkyKey> { - private final SkyQueryEnvironment env; private final MultisetSemaphore<PackageIdentifier> packageSemaphore; private RBuildFilesVisitor( SkyQueryEnvironment env, - ForkJoinPool forkJoinPool, ThreadSafeUniquifier<SkyKey> uniquifier, Callback<Target> callback, MultisetSemaphore<PackageIdentifier> packageSemaphore) { - super(forkJoinPool, uniquifier, callback); - this.env = env; + super(env, uniquifier, callback); this.packageSemaphore = packageSemaphore; } @@ -171,17 +175,14 @@ class ParallelSkyQueryUtils { */ private static class AllRdepsUnboundedVisitor extends AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> { - private final SkyQueryEnvironment env; private final MultisetSemaphore<PackageIdentifier> packageSemaphore; private AllRdepsUnboundedVisitor( SkyQueryEnvironment env, - ForkJoinPool forkJoinPool, ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier, ThreadSafeCallback<Target> callback, MultisetSemaphore<PackageIdentifier> packageSemaphore) { - super(forkJoinPool, uniquifier, callback); - this.env = env; + super(env, uniquifier, callback); this.packageSemaphore = packageSemaphore; } @@ -194,7 +195,6 @@ class ParallelSkyQueryUtils { */ private static class Factory implements AbstractSkyKeyBFSVisitor.Factory { private final SkyQueryEnvironment env; - private final ForkJoinPool forkJoinPool; private final ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier; private final ThreadSafeCallback<Target> callback; private final MultisetSemaphore<PackageIdentifier> packageSemaphore; @@ -202,10 +202,8 @@ class ParallelSkyQueryUtils { private Factory( SkyQueryEnvironment env, ThreadSafeCallback<Target> callback, - ForkJoinPool forkJoinPool, MultisetSemaphore<PackageIdentifier> packageSemaphore) { this.env = env; - this.forkJoinPool = forkJoinPool; this.uniquifier = env.createReverseDepSkyKeyUniquifier(); this.callback = callback; this.packageSemaphore = packageSemaphore; @@ -213,8 +211,7 @@ class ParallelSkyQueryUtils { @Override public AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> create() { - return new AllRdepsUnboundedVisitor( - env, forkJoinPool, uniquifier, callback, packageSemaphore); + return new AllRdepsUnboundedVisitor(env, uniquifier, callback, packageSemaphore); } } @@ -267,29 +264,17 @@ class ParallelSkyQueryUtils { // recursive visitation. Map<SkyKey, Iterable<SkyKey>> unfilteredReverseDeps = env.graph.getReverseDeps(filteredKeys); - // Build a collection of Pairs and group by package id so we can partition them efficiently - // later. - ArrayListMultimap<PackageIdentifier, Pair<SkyKey, SkyKey>> rdepsByPackage = - ArrayListMultimap.create(); + ImmutableList.Builder<Pair<SkyKey, SkyKey>> builder = ImmutableList.builder(); for (Map.Entry<SkyKey, Iterable<SkyKey>> rdeps : unfilteredReverseDeps.entrySet()) { for (SkyKey rdep : rdeps.getValue()) { Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(rdep); if (label != null) { - rdepsByPackage.put(label.getPackageIdentifier(), Pair.of(rdeps.getKey(), rdep)); + builder.add(Pair.of(rdeps.getKey(), rdep)); } } } - // A couple notes here: - // (i) ArrayListMultimap#values returns the values grouped by key, which is exactly what we - // want. - // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid - // accidentally retaining the entire ArrayListMultimap object. - Iterable<Pair<SkyKey, SkyKey>> keysToVisit = ImmutableList.copyOf(rdepsByPackage.values()); - - // TODO(shazh): Use a global pool to store keys to be returned and keys to be processed, and - // assign them to VisitTasks. It allows us to better optimize package retrieval. - return new Visit(/*keysToUseForResult=*/ filteredKeys, /*keysToVisit=*/ keysToVisit); + return new Visit(/*keysToUseForResult=*/ filteredKeys, /*keysToVisit=*/ builder.build()); } @Override @@ -325,6 +310,33 @@ class ParallelSkyQueryUtils { } }); } + + @Override + protected Iterable<Task> getVisitTasks(Collection<Pair<SkyKey, SkyKey>> pendingKeysToVisit) { + // Group pending visits by package. + ArrayListMultimap<PackageIdentifier, Pair<SkyKey, SkyKey>> visitsByPackage = + ArrayListMultimap.create(); + for (Pair<SkyKey, SkyKey> visit : pendingKeysToVisit) { + Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(visit.second); + if (label != null) { + visitsByPackage.put(label.getPackageIdentifier(), visit); + } + } + + ImmutableList.Builder<Task> builder = ImmutableList.builder(); + + // A couple notes here: + // (i) ArrayListMultimap#values returns the values grouped by key, which is exactly what we + // want. + // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid + // accidentally retaining the entire ArrayListMultimap object. + for (Iterable<Pair<SkyKey, SkyKey>> keysToVisitBatch : + Iterables.partition(ImmutableList.copyOf(visitsByPackage.values()), VISIT_BATCH_SIZE)) { + builder.add(new VisitTask(keysToVisitBatch)); + } + + return builder.build(); + } } /** @@ -349,28 +361,90 @@ class ParallelSkyQueryUtils { /** * A helper class for performing a custom BFS visitation on the Skyframe graph, using {@link - * ForkJoinPool}. + * QuiescingExecutor}. * - * <p>The choice of {@link ForkJoinPool} over, say, AbstractQueueVisitor backed by a - * ThreadPoolExecutor, is very deliberate. {@link SkyKeyBFSVisitorCallback#process} kicks off a - * visitation and blocks on completion of it. But this visitation may never complete if there are - * a bounded number of threads in the global thread pool used for query evaluation! + * <p>The visitor uses an AbstractQueueVisitor backed by a ThreadPoolExecutor with a thread pool + * NOT part of the global query evaluation pool to avoid starvation. */ @ThreadSafe private abstract static class AbstractSkyKeyBFSVisitor<T> { - private final ForkJoinPool forkJoinPool; + protected final SkyQueryEnvironment env; private final ThreadSafeUniquifier<T> uniquifier; private final Callback<Target> callback; - /** The maximum number of keys to visit at once. */ - private static final int VISIT_BATCH_SIZE = 10000; + + private final QuiescingExecutor executor; + + /** A queue to store pending visits. */ + private final LinkedBlockingQueue<T> processingQueue = new LinkedBlockingQueue<>(); + + /** + * The max time interval between two scheduling passes in milliseconds. A scheduling pass is + * defined as the scheduler thread determining whether to drain all pending visits from the + * queue and submitting tasks to perform the visits. + * + * <p>The choice of 1ms is a result based of experiments. It is an attempted balance due to a + * few facts about the scheduling interval: + * + * <p>1. A large interval adds systematic delay. In an extreme case, a BFS visit which is + * supposed to take only 1ms now may take 5ms. For most BFS visits which take longer than a few + * hundred milliseconds, it should not be noticeable. + * + * <p>2. A zero-interval config eats too much CPU. + * + * <p>Even though the scheduler runs once every 1 ms, it does not try to drain it every time. + * Pending visits are drained only certain criteria are met. + */ + private static final long SCHEDULING_INTERVAL_MILLISECONDS = 1; + + /** + * The minimum number of pending tasks the scheduler tries to hit. The 3x number is set based on + * experiments. We do not want to schedule tasks too frequently to miss the benefits of large + * number of keys being grouped by packages. On the other hand, we want to keep all threads in + * the pool busy to achieve full capacity. A low number here will cause some of the worker + * threads to go idle at times before the next scheduling cycle. + * + * <p>TODO(shazh): Revisit the choice of task target based on real-prod performance. + */ + private static final long MIN_PENDING_TASKS = 3 * SkyQueryEnvironment.DEFAULT_THREAD_COUNT; + + /** + * Fail fast on RuntimeExceptions, including {code RuntimeInterruptedException} and {@code + * RuntimeQueryException}, which are resulted from InterruptedException and QueryException. + */ + static final ErrorClassifier SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER = + new ErrorClassifier() { + @Override + protected ErrorClassification classifyException(Exception e) { + return (e instanceof RuntimeException) + ? ErrorClassification.CRITICAL_AND_LOG + : ErrorClassification.NOT_CRITICAL; + } + }; + + /** All BFS visitors share a single global fixed thread pool. */ + private static final ExecutorService FIXED_THREAD_POOL_EXECUTOR = + new ThreadPoolExecutor( + // Must be at least 2 worker threads in the pool (1 for the scheduler thread). + /*corePoolSize=*/ Math.max(2, SkyQueryEnvironment.DEFAULT_THREAD_COUNT), + /*maximumPoolSize=*/ Math.max(2, SkyQueryEnvironment.DEFAULT_THREAD_COUNT), + /*keepAliveTime=*/ 1, + /*units=*/ TimeUnit.SECONDS, + /*workQueue=*/ new BlockingStack<Runnable>(), + new ThreadFactoryBuilder().setNameFormat("skykey-bfs-visitor %d").build()); private AbstractSkyKeyBFSVisitor( - ForkJoinPool forkJoinPool, - ThreadSafeUniquifier<T> uniquifier, - Callback<Target> callback) { - this.forkJoinPool = forkJoinPool; + SkyQueryEnvironment env, ThreadSafeUniquifier<T> uniquifier, Callback<Target> callback) { + this.env = env; this.uniquifier = uniquifier; this.callback = callback; + this.executor = + new AbstractQueueVisitor( + /*concurrent=*/ true, + /*executorService=*/ FIXED_THREAD_POOL_EXECUTOR, + // Leave the thread pool active for other current and future callers. + /*shutdownOnCompletion=*/ false, + /*failFastOnException=*/ true, + /*errorClassifier=*/ SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER); } /** Factory for {@link AbstractSkyKeyBFSVisitor} instances. */ @@ -390,44 +464,103 @@ class ParallelSkyQueryUtils { void visitAndWaitForCompletion(Iterable<SkyKey> keys) throws QueryException, InterruptedException { - Iterable<ForkJoinTask<?>> tasks = - getTasks( - new Visit( - /*keysToUseForResult=*/ ImmutableList.<SkyKey>of(), - /*keysToVisit=*/ preprocessInitialVisit(keys))); - for (ForkJoinTask<?> task : tasks) { - forkJoinPool.execute(task); - } + processingQueue.addAll(ImmutableList.copyOf(preprocessInitialVisit(keys))); + // We add the scheduler to the pool, allowing it (as well as any submitted tasks later) + // to be failed fast if any QueryException or InterruptedException is received. + executor.execute(new Scheduler()); try { - MoreFutures.waitForAllInterruptiblyFailFast(tasks); - } catch (ExecutionException ee) { - Throwable cause = ee.getCause(); - if (cause instanceof RuntimeQueryException) { - throw (QueryException) cause.getCause(); - } else if (cause instanceof RuntimeInterruptedException) { - throw (InterruptedException) cause.getCause(); - } else { - throw new IllegalStateException(cause); + executor.awaitQuiescence(true); + } catch (RuntimeQueryException e) { + throw (QueryException) e.getCause(); + } catch (RuntimeInterruptedException e) { + throw (InterruptedException) e.getCause(); + } + } + + /** + * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s in + * the full visitation to the given {@link Callback}. + */ + protected abstract void processResultantTargets( + Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) + throws QueryException, InterruptedException; + + /** Gets the {@link Visit} representing the local visitation of the given {@code values}. */ + protected abstract Visit getVisitResult(Iterable<T> values) throws InterruptedException; + + /** Gets the first {@link Visit} representing the entry-level SkyKeys. */ + protected abstract Iterable<T> preprocessInitialVisit(Iterable<SkyKey> keys); + + protected Iterable<Task> getVisitTasks(Collection<T> pendingKeysToVisit) { + ImmutableList.Builder<Task> builder = ImmutableList.builder(); + for (Iterable<T> keysToVisitBatch : + Iterables.partition(pendingKeysToVisit, VISIT_BATCH_SIZE)) { + builder.add(new VisitTask(keysToVisitBatch)); + } + + return builder.build(); + } + + private class Scheduler implements Runnable { + @Override + public void run() { + // The scheduler keeps running until both the following two conditions are met. + // + // 1. There is no pending visit in the queue. + // 2. There is no pending task (other than itself) in the pool. + if (processingQueue.isEmpty() && executor.getRemainingTasksCount() <= 1) { + return; + } + + // To achieve maximum efficiency, queue is drained in either of the following 2 conditions: + // + // 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU. + // 2. The process queue size is large. + if (executor.getRemainingTasksCount() < MIN_PENDING_TASKS + || processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) { + drainProcessingQueue(); + } + + try { + // Wait at most {@code SCHEDULING_INTERVAL_MILLISECONDS} milliseconds. + Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeInterruptedException(e); + } + + executor.execute(new Scheduler()); + } + + private void drainProcessingQueue() { + Collection<T> pendingKeysToVisit = new ArrayList<>(processingQueue.size()); + processingQueue.drainTo(pendingKeysToVisit); + if (pendingKeysToVisit.isEmpty()) { + return; + } + + for (Task task : getVisitTasks(pendingKeysToVisit)) { + executor.execute(task); } } } - private abstract static class AbstractInternalRecursiveAction extends RecursiveAction { - protected abstract void computeImpl() throws QueryException, InterruptedException; + abstract static class Task implements Runnable { @Override - public final void compute() { + public void run() { try { - computeImpl(); - } catch (QueryException queryException) { - throw new RuntimeQueryException(queryException); - } catch (InterruptedException interruptedException) { - throw new RuntimeInterruptedException(interruptedException); + process(); + } catch (QueryException e) { + throw new RuntimeQueryException(e); + } catch (InterruptedException e) { + throw new RuntimeInterruptedException(e); } } + + abstract void process() throws QueryException, InterruptedException; } - private class VisitTask extends AbstractInternalRecursiveAction { + class VisitTask extends Task { private final Iterable<T> keysToVisit; private VisitTask(Iterable<T> keysToVisit) { @@ -435,22 +568,24 @@ class ParallelSkyQueryUtils { } @Override - protected void computeImpl() throws InterruptedException { + void process() throws InterruptedException { ImmutableList<T> uniqueKeys = uniquifier.unique(keysToVisit); if (uniqueKeys.isEmpty()) { return; } - Iterable<ForkJoinTask<?>> tasks = getTasks(getVisitResult(uniqueKeys)); - for (ForkJoinTask<?> task : tasks) { - task.fork(); - } - for (ForkJoinTask<?> task : tasks) { - task.join(); + + Visit visit = getVisitResult(uniqueKeys); + for (Iterable<SkyKey> keysToUseForResultBatch : + Iterables.partition( + visit.keysToUseForResult, SkyQueryEnvironment.BATCH_CALLBACK_SIZE)) { + executor.execute(new GetAndProcessResultsTask(keysToUseForResultBatch)); } + + processingQueue.addAll(ImmutableList.copyOf(visit.keysToVisit)); } } - private class GetAndProcessResultsTask extends AbstractInternalRecursiveAction { + private class GetAndProcessResultsTask extends Task { private final Iterable<SkyKey> keysToUseForResult; private GetAndProcessResultsTask(Iterable<SkyKey> keysToUseForResult) { @@ -458,42 +593,10 @@ class ParallelSkyQueryUtils { } @Override - protected void computeImpl() throws QueryException, InterruptedException { + protected void process() throws QueryException, InterruptedException { processResultantTargets(keysToUseForResult, callback); } } - - private Iterable<ForkJoinTask<?>> getTasks(Visit visit) { - // Split the given visit request into ForkJoinTasks for visiting keys and ForkJoinTasks for - // getting and outputting results, each of which obeys the separate batch limits. - // TODO(bazel-team): Attempt to group work on targets within the same package. - ImmutableList.Builder<ForkJoinTask<?>> tasksBuilder = ImmutableList.builder(); - // Fork the tasks for getting and outputting results first - this way we maximize for - // throughput to the underlying callback. - for (Iterable<SkyKey> keysToUseForResultBatch : Iterables.partition( - visit.keysToUseForResult, SkyQueryEnvironment.BATCH_CALLBACK_SIZE)) { - tasksBuilder.add(new GetAndProcessResultsTask(keysToUseForResultBatch)); - } - for (Iterable<T> keysToVisitBatch : - Iterables.partition(visit.keysToVisit, VISIT_BATCH_SIZE)) { - tasksBuilder.add(new VisitTask(keysToVisitBatch)); - } - return tasksBuilder.build(); - } - - /** - * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s - * in the full visitation to the given {@link Callback}. - */ - protected abstract void processResultantTargets( - Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) - throws QueryException, InterruptedException; - - /** Gets the {@link Visit} representing the local visitation of the given {@code values}. */ - protected abstract Visit getVisitResult(Iterable<T> values) throws InterruptedException; - - /** Gets the first {@link Visit} representing the entry-level SkyKeys. */ - protected abstract Iterable<T> preprocessInitialVisit(Iterable<SkyKey> keys); } private static class RuntimeQueryException extends RuntimeException { diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index 705738ce84..87a57af2c4 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -853,17 +853,20 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> public Map<SkyKey, Target> makeTargetsFromPackageKeyToTargetKeyMap( Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap) throws InterruptedException { ImmutableMap.Builder<SkyKey, Target> result = ImmutableMap.builder(); + Set<SkyKey> processedTargets = new HashSet<>(); Map<SkyKey, SkyValue> packageMap = graph.getSuccessfulValues(packageKeyToTargetKeyMap.keySet()); for (Map.Entry<SkyKey, SkyValue> entry : packageMap.entrySet()) { for (SkyKey targetKey : packageKeyToTargetKeyMap.get(entry.getKey())) { - try { - result.put( - targetKey, - ((PackageValue) entry.getValue()) - .getPackage() - .getTarget((SKYKEY_TO_LABEL.apply(targetKey)).getName())); - } catch (NoSuchTargetException e) { - // Skip missing target. + if (processedTargets.add(targetKey)) { + try { + result.put( + targetKey, + ((PackageValue) entry.getValue()) + .getPackage() + .getTarget((SKYKEY_TO_LABEL.apply(targetKey)).getName())); + } catch (NoSuchTargetException e) { + // Skip missing target. + } } } } @@ -1013,8 +1016,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> ThreadSafeCallback<Target> callback, ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - ParallelSkyQueryUtils.getRBuildFilesParallel( - this, fileIdentifiers, callback, forkJoinPool, packageSemaphore); + ParallelSkyQueryUtils.getRBuildFilesParallel(this, fileIdentifiers, callback, packageSemaphore); } /** @@ -1199,7 +1201,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { ParallelSkyQueryUtils.getAllRdepsUnboundedParallel( - this, expression, context, callback, forkJoinPool, packageSemaphore); + this, expression, context, callback, packageSemaphore); } @ThreadSafe |