diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java | 116 |
1 files changed, 87 insertions, 29 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java index ac3e2a1be6..5f7447b198 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java @@ -18,12 +18,15 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.collect.CompactHashSet; import com.google.devtools.build.lib.concurrent.MoreFutures; +import com.google.devtools.build.lib.concurrent.MultisetSemaphore; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.query2.engine.Callback; @@ -69,13 +72,14 @@ class ParallelSkyQueryUtils { QueryExpression expression, VariableContext<Target> context, ThreadSafeCallback<Target> callback, - ForkJoinPool forkJoinPool) + ForkJoinPool forkJoinPool, + MultisetSemaphore<PackageIdentifier> packageSemaphore) throws QueryException, InterruptedException { env.eval( expression, context, new SkyKeyBFSVisitorCallback( - new AllRdepsUnboundedVisitor.Factory(env, callback, forkJoinPool))); + new AllRdepsUnboundedVisitor.Factory(env, callback, forkJoinPool, packageSemaphore))); } /** Specialized parallel variant of {@link SkyQueryEnvironment#getRBuildFiles}. */ @@ -83,24 +87,29 @@ class ParallelSkyQueryUtils { SkyQueryEnvironment env, Collection<PathFragment> fileIdentifiers, ThreadSafeCallback<Target> callback, - ForkJoinPool forkJoinPool) + ForkJoinPool forkJoinPool, + MultisetSemaphore<PackageIdentifier> packageSemaphore) throws QueryException, InterruptedException { ThreadSafeUniquifier<SkyKey> keyUniquifier = env.createSkyKeyUniquifier(); - RBuildFilesVisitor visitor = new RBuildFilesVisitor(env, forkJoinPool, keyUniquifier, callback); + RBuildFilesVisitor visitor = + new RBuildFilesVisitor(env, forkJoinPool, keyUniquifier, callback, packageSemaphore); visitor.visitAndWaitForCompletion(env.getSkyKeysForFileFragments(fileIdentifiers)); } /** A helper class that computes 'rbuildfiles(<blah>)' via BFS. */ private static class RBuildFilesVisitor extends AbstractSkyKeyBFSVisitor<SkyKey> { private final SkyQueryEnvironment env; + private final MultisetSemaphore<PackageIdentifier> packageSemaphore; private RBuildFilesVisitor( SkyQueryEnvironment env, ForkJoinPool forkJoinPool, ThreadSafeUniquifier<SkyKey> uniquifier, - Callback<Target> callback) { + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore) { super(forkJoinPool, uniquifier, callback); this.env = env; + this.packageSemaphore = packageSemaphore; } @Override @@ -125,10 +134,21 @@ class ParallelSkyQueryUtils { } @Override - protected Iterable<Target> getTargetsToAddToResult(Iterable<SkyKey> keysToUseForResult) - throws InterruptedException { - return SkyQueryEnvironment.getBuildFilesForPackageValues( - env.graph.getSuccessfulValues(keysToUseForResult).values()); + protected void processResultantTargets( + Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) + throws QueryException, InterruptedException { + Set<PackageIdentifier> pkgIdsNeededForResult = + ImmutableSet.copyOf( + Iterables.transform( + keysToUseForResult, + SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)); + packageSemaphore.acquireAll(pkgIdsNeededForResult); + try { + callback.process(SkyQueryEnvironment.getBuildFilesForPackageValues( + env.graph.getSuccessfulValues(keysToUseForResult).values())); + } finally { + packageSemaphore.releaseAll(pkgIdsNeededForResult); + } } @Override @@ -152,14 +172,17 @@ class ParallelSkyQueryUtils { private static class AllRdepsUnboundedVisitor extends AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> { private final SkyQueryEnvironment env; + private final MultisetSemaphore<PackageIdentifier> packageSemaphore; private AllRdepsUnboundedVisitor( SkyQueryEnvironment env, ForkJoinPool forkJoinPool, ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier, - ThreadSafeCallback<Target> callback) { + ThreadSafeCallback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore) { super(forkJoinPool, uniquifier, callback); this.env = env; + this.packageSemaphore = packageSemaphore; } /** @@ -174,20 +197,24 @@ class ParallelSkyQueryUtils { private final ForkJoinPool forkJoinPool; private final ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier; private final ThreadSafeCallback<Target> callback; + private final MultisetSemaphore<PackageIdentifier> packageSemaphore; private Factory( SkyQueryEnvironment env, ThreadSafeCallback<Target> callback, - ForkJoinPool forkJoinPool) { + ForkJoinPool forkJoinPool, + MultisetSemaphore<PackageIdentifier> packageSemaphore) { this.env = env; this.forkJoinPool = forkJoinPool; this.uniquifier = env.createReverseDepSkyKeyUniquifier(); this.callback = callback; + this.packageSemaphore = packageSemaphore; } @Override public AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> create() { - return new AllRdepsUnboundedVisitor(env, forkJoinPool, uniquifier, callback); + return new AllRdepsUnboundedVisitor( + env, forkJoinPool, uniquifier, callback, packageSemaphore); } } @@ -213,13 +240,27 @@ class ParallelSkyQueryUtils { reverseDepsMap.get(reverseDepPair.first).add(reverseDepPair.second); } - // Filter out disallowed deps. We cannot defer the targetification any further as we do not - // want to retrieve the rdeps of unwanted nodes (targets). - if (!reverseDepsMap.isEmpty()) { - Collection<Target> filteredTargets = - env.filterRawReverseDepsOfTransitiveTraversalKeys(reverseDepsMap); - filteredKeys.addAll( - Collections2.transform(filteredTargets, SkyQueryEnvironment.TARGET_TO_SKY_KEY)); + Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap = + env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepsMap.values())); + Set<PackageIdentifier> pkgIdsNeededForTargetification = + ImmutableSet.copyOf( + Iterables.transform( + packageKeyToTargetKeyMap.keySet(), + SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)); + packageSemaphore.acquireAll(pkgIdsNeededForTargetification); + + try { + // Filter out disallowed deps. We cannot defer the targetification any further as we do not + // want to retrieve the rdeps of unwanted nodes (targets). + if (!reverseDepsMap.isEmpty()) { + Collection<Target> filteredTargets = + env.filterRawReverseDepsOfTransitiveTraversalKeys( + reverseDepsMap, packageKeyToTargetKeyMap); + filteredKeys.addAll( + Collections2.transform(filteredTargets, SkyQueryEnvironment.TARGET_TO_SKY_KEY)); + } + } finally { + packageSemaphore.releaseAll(pkgIdsNeededForTargetification); } // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next @@ -252,9 +293,23 @@ class ParallelSkyQueryUtils { } @Override - protected Iterable<Target> getTargetsToAddToResult(Iterable<SkyKey> keysToUseForResult) - throws InterruptedException { - return env.makeTargetsFromSkyKeys(keysToUseForResult).values(); + protected void processResultantTargets( + Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) + throws QueryException, InterruptedException { + Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap = + env.makePackageKeyToTargetKeyMap(keysToUseForResult); + Set<PackageIdentifier> pkgIdsNeededForResult = + ImmutableSet.copyOf( + Iterables.transform( + packageKeyToTargetKeyMap.keySet(), + SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)); + packageSemaphore.acquireAll(pkgIdsNeededForResult); + try { + callback.process( + env.makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap).values()); + } finally { + packageSemaphore.releaseAll(pkgIdsNeededForResult); + } } @Override @@ -294,7 +349,7 @@ class ParallelSkyQueryUtils { /** * A helper class for performing a custom BFS visitation on the Skyframe graph, using {@link - * ForkJoinQuiescingExecutor}. + * ForkJoinPool}. * * <p>The choice of {@link ForkJoinPool} over, say, AbstractQueueVisitor backed by a * ThreadPoolExecutor, is very deliberate. {@link SkyKeyBFSVisitorCallback#process} kicks off a @@ -310,7 +365,9 @@ class ParallelSkyQueryUtils { private static final int VISIT_BATCH_SIZE = 10000; private AbstractSkyKeyBFSVisitor( - ForkJoinPool forkJoinPool, ThreadSafeUniquifier<T> uniquifier, Callback<Target> callback) { + ForkJoinPool forkJoinPool, + ThreadSafeUniquifier<T> uniquifier, + Callback<Target> callback) { this.forkJoinPool = forkJoinPool; this.uniquifier = uniquifier; this.callback = callback; @@ -402,7 +459,7 @@ class ParallelSkyQueryUtils { @Override protected void computeImpl() throws QueryException, InterruptedException { - callback.process(getTargetsToAddToResult(keysToUseForResult)); + processResultantTargets(keysToUseForResult, callback); } } @@ -425,11 +482,12 @@ class ParallelSkyQueryUtils { } /** - * Gets the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s in the - * full visitation. + * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s + * in the full visitation to the given {@link Callback}. */ - protected abstract Iterable<Target> getTargetsToAddToResult( - Iterable<SkyKey> keysToUseForResult) throws InterruptedException; + protected abstract void processResultantTargets( + Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) + throws QueryException, InterruptedException; /** Gets the {@link Visit} representing the local visitation of the given {@code values}. */ protected abstract Visit getVisitResult(Iterable<T> values) throws InterruptedException; |