diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java | 590 |
1 files changed, 489 insertions, 101 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java index 585b39d695..e0a09851fa 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java @@ -16,21 +16,31 @@ package com.google.devtools.build.lib.query2; import static com.google.common.collect.ImmutableSet.toImmutableSet; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.ListMultimap; -import com.google.common.collect.Maps; +import com.google.common.collect.MapMaker; import com.google.common.collect.Multimap; +import com.google.common.collect.Streams; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.collect.compacthashset.CompactHashSet; import com.google.devtools.build.lib.concurrent.MultisetSemaphore; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.query2.engine.Callback; +import com.google.devtools.build.lib.query2.engine.MinDepthUniquifier; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ThreadSafeMutableSet; import com.google.devtools.build.lib.query2.engine.QueryException; import com.google.devtools.build.lib.query2.engine.QueryExpression; +import com.google.devtools.build.lib.query2.engine.QueryUtil; +import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback; import com.google.devtools.build.lib.query2.engine.QueryUtil.UniquifierImpl; import com.google.devtools.build.lib.query2.engine.Uniquifier; import com.google.devtools.build.lib.query2.engine.VariableContext; @@ -40,7 +50,8 @@ import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.skyframe.SkyKey; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -63,10 +74,6 @@ public class ParallelSkyQueryUtils { private ParallelSkyQueryUtils() { } - /** - * Specialized parallel variant of {@link SkyQueryEnvironment#getAllRdeps} that is appropriate - * when there is no depth-bound. - */ static QueryTaskFuture<Void> getAllRdepsUnboundedParallel( SkyQueryEnvironment env, QueryExpression expression, @@ -77,7 +84,95 @@ public class ParallelSkyQueryUtils { expression, context, ParallelVisitor.createParallelVisitorCallback( - new AllRdepsUnboundedVisitor.Factory(env, callback, packageSemaphore))); + new RdepsUnboundedVisitor.Factory( + env, + /*universe=*/ Predicates.alwaysTrue(), + callback, + packageSemaphore))); + } + + static QueryTaskFuture<Void> getAllRdepsBoundedParallel( + SkyQueryEnvironment env, + QueryExpression expression, + int depth, + VariableContext<Target> context, + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore) { + return env.eval( + expression, + context, + ParallelVisitor.createParallelVisitorCallback( + new RdepsBoundedVisitor.Factory( + env, + depth, + /*universe=*/ Predicates.alwaysTrue(), + callback, + packageSemaphore))); + } + + static QueryTaskFuture<Void> getRdepsInUniverseUnboundedParallel( + SkyQueryEnvironment env, + QueryExpression expression, + Predicate<SkyKey> universe, + VariableContext<Target> context, + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore) { + return env.eval( + expression, + context, + ParallelVisitor.createParallelVisitorCallback( + new RdepsUnboundedVisitor.Factory(env, universe, callback, packageSemaphore))); + } + + static QueryTaskFuture<Predicate<SkyKey>> getDTCSkyKeyPredicateFuture( + SkyQueryEnvironment env, + QueryExpression expression, + VariableContext<Target> context, + int processResultsBatchSize, + int concurrencyLevel) { + QueryTaskFuture<ThreadSafeMutableSet<Target>> universeValueFuture = + QueryUtil.evalAll(env, context, expression); + + Function<ThreadSafeMutableSet<Target>, QueryTaskFuture<Predicate<SkyKey>>> + getTransitiveClosureAsyncFunction = + universeValue -> { + ThreadSafeAggregateAllSkyKeysCallback aggregateAllCallback = + new ThreadSafeAggregateAllSkyKeysCallback(concurrencyLevel); + return env.executeAsync( + () -> { + Callback<Target> visitorCallback = + ParallelVisitor.createParallelVisitorCallback( + new TransitiveClosureVisitor.Factory( + env, + env.createSkyKeyUniquifier(), + processResultsBatchSize, + aggregateAllCallback)); + visitorCallback.process(universeValue); + return Predicates.in(aggregateAllCallback.getResult()); + }); + }; + + return env.transformAsync(universeValueFuture, getTransitiveClosureAsyncFunction); + } + + static QueryTaskFuture<Void> getRdepsInUniverseBoundedParallel( + SkyQueryEnvironment env, + QueryExpression expression, + int depth, + Predicate<SkyKey> universe, + VariableContext<Target> context, + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore) { + return env.eval( + expression, + context, + ParallelVisitor.createParallelVisitorCallback( + new RdepsBoundedVisitor.Factory( + env, + depth, + universe, + callback, + packageSemaphore))); } /** Specialized parallel variant of {@link SkyQueryEnvironment#getRBuildFiles}. */ @@ -191,8 +286,75 @@ public class ParallelSkyQueryUtils { } } + private abstract static class AbstractRdepsVisitor<T> extends ParallelVisitor<T, Target> { + private static final int PROCESS_RESULTS_BATCH_SIZE = SkyQueryEnvironment.BATCH_CALLBACK_SIZE; + + protected final SkyQueryEnvironment env; + protected final MultisetSemaphore<PackageIdentifier> packageSemaphore; + + protected AbstractRdepsVisitor( + SkyQueryEnvironment env, + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore) { + super(callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE); + this.env = env; + this.packageSemaphore = packageSemaphore; + } + + @Override + protected void processPartialResults( + Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) + throws QueryException, InterruptedException { + Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap = + env.makePackageKeyToTargetKeyMap(keysToUseForResult); + Set<PackageIdentifier> pkgIdsNeededForResult = + packageKeyToTargetKeyMap + .keySet() + .stream() + .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER) + .collect(toImmutableSet()); + packageSemaphore.acquireAll(pkgIdsNeededForResult); + try { + callback.process( + env.makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap).values()); + } finally { + packageSemaphore.releaseAll(pkgIdsNeededForResult); + } + } + + protected abstract SkyKey getRdepOfVisit(T visit); + + @Override + protected Iterable<Task> getVisitTasks(Collection<T> pendingVisits) { + // Group pending visitation by the package of the rdep, since we'll be targetfying the + // rdep during the visitation. + ListMultimap<PackageIdentifier, T> visitsByPackage = ArrayListMultimap.create(); + for (T visit : pendingVisits) { + Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(getRdepOfVisit(visit)); + if (label != null) { + visitsByPackage.put(label.getPackageIdentifier(), visit); + } + } + + ImmutableList.Builder<Task> builder = ImmutableList.builder(); + + // A couple notes here: + // (i) ArrayListMultimap#values returns the values grouped by key, which is exactly what we + // want. + // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid + // accidentally retaining the entire ArrayListMultimap object. + for (Iterable<T> visitBatch : + Iterables.partition(ImmutableList.copyOf(visitsByPackage.values()), VISIT_BATCH_SIZE)) { + builder.add(new VisitTask(visitBatch)); + } + + return builder.build(); + } + } + /** - * A helper class that computes 'allrdeps(<blah>)' via BFS. + * A helper class that computes unbounded 'allrdeps(<expr>)' or + * 'rdeps(<precomputed-universe>, <expr>)' via BFS. * * <p>The visitor uses {@link DepAndRdep} to keep track the nodes to visit and avoid dealing with * targetification of reverse deps until they are needed. The rdep node itself is needed to filter @@ -202,10 +364,7 @@ public class ParallelSkyQueryUtils { * risk of OOMs. The additional memory usage should not be a large concern here, as even with 10M * edges, the memory overhead is around 160M, and the memory can be reclaimed by regular GC. */ - private static class AllRdepsUnboundedVisitor extends ParallelVisitor<DepAndRdep, Target> { - private static final int PROCESS_RESULTS_BATCH_SIZE = SkyQueryEnvironment.BATCH_CALLBACK_SIZE; - private final SkyQueryEnvironment env; - private final MultisetSemaphore<PackageIdentifier> packageSemaphore; + private static class RdepsUnboundedVisitor extends AbstractRdepsVisitor<DepAndRdep> { /** * A {@link Uniquifier} for visitations. Solely used for {@link #getUniqueValues}, which * actually isn't that useful. See the method javadoc. @@ -215,25 +374,26 @@ public class ParallelSkyQueryUtils { * A {@link Uniquifier} for *valid* visitations of rdeps. {@code env}'s dependency filter might * mean that some rdep edges are invalid, meaning that any individual {@link DepAndRdep} * visitation may actually be invalid. Because the same rdep can be reached through more than - * one reverse edge, It'd be incorrectly to naively dedupe visitations solely based on the rdep. + * one reverse edge, it'd be incorrect to naively dedupe visitations solely based on the rdep. */ private final Uniquifier<SkyKey> validRdepUniquifier; + private final Predicate<SkyKey> universe; - private AllRdepsUnboundedVisitor( + private RdepsUnboundedVisitor( SkyQueryEnvironment env, Uniquifier<DepAndRdep> depAndRdepUniquifier, Uniquifier<SkyKey> validRdepUniquifier, + Predicate<SkyKey> universe, Callback<Target> callback, MultisetSemaphore<PackageIdentifier> packageSemaphore) { - super(callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE); - this.env = env; + super(env, callback, packageSemaphore); this.depAndRdepUniquifier = depAndRdepUniquifier; this.validRdepUniquifier = validRdepUniquifier; - this.packageSemaphore = packageSemaphore; + this.universe = universe; } /** - * A {@link Factory} for {@link AllRdepsUnboundedVisitor} instances, each of which will be used + * A {@link Factory} for {@link RdepsUnboundedVisitor} instances, each of which will be used * to perform visitation of the reverse transitive closure of the {@link Target}s passed in a * single {@link Callback#process} call. Note that all the created instances share the same * {@link Uniquifier} so that we don't visit the same Skyframe node more than once. @@ -242,14 +402,17 @@ public class ParallelSkyQueryUtils { private final SkyQueryEnvironment env; private final Uniquifier<DepAndRdep> depAndRdepUniquifier; private final Uniquifier<SkyKey> validRdepUniquifier; + private final Predicate<SkyKey> universe; private final Callback<Target> callback; private final MultisetSemaphore<PackageIdentifier> packageSemaphore; private Factory( SkyQueryEnvironment env, + Predicate<SkyKey> universe, Callback<Target> callback, MultisetSemaphore<PackageIdentifier> packageSemaphore) { this.env = env; + this.universe = universe; this.depAndRdepUniquifier = new UniquifierImpl<>(depAndRdep -> depAndRdep); this.validRdepUniquifier = env.createSkyKeyUniquifier(); this.callback = callback; @@ -258,31 +421,29 @@ public class ParallelSkyQueryUtils { @Override public ParallelVisitor<DepAndRdep, Target> create() { - return new AllRdepsUnboundedVisitor( - env, depAndRdepUniquifier, validRdepUniquifier, callback, packageSemaphore); + return new RdepsUnboundedVisitor( + env, depAndRdepUniquifier, validRdepUniquifier, universe, callback, packageSemaphore); } } @Override protected Visit getVisitResult(Iterable<DepAndRdep> depAndRdeps) throws InterruptedException { - Collection<SkyKey> filteredUniqueKeys = new ArrayList<>(); + Collection<SkyKey> validRdeps = new ArrayList<>(); - // Build a raw reverse dep map from pairs of SkyKeys to filter out the disallowed deps. - Map<SkyKey, Collection<SkyKey>> reverseDepsMap = Maps.newHashMap(); + // Multimap of dep to all the reverse deps in this visitation. Used to filter out the + // disallowed deps. + Multimap<SkyKey, SkyKey> reverseDepMultimap = ArrayListMultimap.create(); for (DepAndRdep depAndRdep : depAndRdeps) { // The "roots" of our visitation (see #preprocessInitialVisit) have a null 'dep' field. - if (depAndRdep.dep == null && validRdepUniquifier.unique(depAndRdep.rdep)) { - filteredUniqueKeys.add(depAndRdep.rdep); - continue; + if (depAndRdep.dep == null) { + validRdeps.add(depAndRdep.rdep); + } else { + reverseDepMultimap.put(depAndRdep.dep, depAndRdep.rdep); } - - reverseDepsMap.computeIfAbsent(depAndRdep.dep, k -> new LinkedList<SkyKey>()); - - reverseDepsMap.get(depAndRdep.dep).add(depAndRdep.rdep); } Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap = - env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepsMap.values())); + env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepMultimap.values())); Set<PackageIdentifier> pkgIdsNeededForTargetification = packageKeyToTargetKeyMap .keySet() @@ -294,95 +455,49 @@ public class ParallelSkyQueryUtils { try { // Filter out disallowed deps. We cannot defer the targetification any further as we do not // want to retrieve the rdeps of unwanted nodes (targets). - if (!reverseDepsMap.isEmpty()) { + if (!reverseDepMultimap.isEmpty()) { Collection<Target> filteredTargets = env.filterRawReverseDepsOfTransitiveTraversalKeys( - reverseDepsMap, packageKeyToTargetKeyMap); + reverseDepMultimap.asMap(), packageKeyToTargetKeyMap); filteredTargets .stream() .map(SkyQueryEnvironment.TARGET_TO_SKY_KEY) - .forEachOrdered( - rdep -> { - if (validRdepUniquifier.unique(rdep)) { - filteredUniqueKeys.add(rdep); - } - }); - } + .forEachOrdered(validRdeps::add); + } } finally { packageSemaphore.releaseAll(pkgIdsNeededForTargetification); } + ImmutableList<SkyKey> uniqueValidRdeps = validRdeps.stream() + .filter(validRdepUniquifier::unique) + .collect(ImmutableList.toImmutableList()); + // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next // recursive visitation. - Map<SkyKey, Iterable<SkyKey>> unfilteredReverseDeps = - env.graph.getReverseDeps(filteredUniqueKeys); - - ImmutableList.Builder<DepAndRdep> builder = ImmutableList.builder(); - for (Map.Entry<SkyKey, Iterable<SkyKey>> rdeps : unfilteredReverseDeps.entrySet()) { - for (SkyKey rdep : rdeps.getValue()) { - Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(rdep); - if (label != null) { - builder.add(new DepAndRdep(rdeps.getKey(), rdep)); - } - } - } + ImmutableList.Builder<DepAndRdep> depAndRdepsToVisitBuilder = ImmutableList.builder(); + env.graph.getReverseDeps(uniqueValidRdeps).entrySet() + .forEach(reverseDepsEntry -> depAndRdepsToVisitBuilder.addAll( + Iterables.transform( + Iterables.filter( + reverseDepsEntry.getValue(), + Predicates.and(SkyQueryEnvironment.IS_TTV, universe)), + rdep -> new DepAndRdep(reverseDepsEntry.getKey(), rdep)))); return new Visit( - /*keysToUseForResult=*/ filteredUniqueKeys, /*keysToVisit=*/ builder.build()); - } - - @Override - protected void processPartialResults( - Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) - throws QueryException, InterruptedException { - Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap = - env.makePackageKeyToTargetKeyMap(keysToUseForResult); - Set<PackageIdentifier> pkgIdsNeededForResult = - packageKeyToTargetKeyMap - .keySet() - .stream() - .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER) - .collect(toImmutableSet()); - packageSemaphore.acquireAll(pkgIdsNeededForResult); - try { - callback.process( - env.makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap).values()); - } finally { - packageSemaphore.releaseAll(pkgIdsNeededForResult); - } + /*keysToUseForResult=*/ uniqueValidRdeps, + /*keysToVisit=*/ depAndRdepsToVisitBuilder.build()); } @Override protected Iterable<DepAndRdep> preprocessInitialVisit(Iterable<SkyKey> keys) { - return Iterables.transform(keys, key -> new DepAndRdep(null, key)); + return Iterables.transform( + Iterables.filter(keys, k -> universe.apply(k)), + key -> new DepAndRdep(null, key)); } @Override - protected Iterable<Task> getVisitTasks(Collection<DepAndRdep> pendingKeysToVisit) { - // Group pending (dep, rdep) visits by the package of the rdep, since we'll be targetfying the - // rdep during the visitation. - ListMultimap<PackageIdentifier, DepAndRdep> visitsByPackage = - ArrayListMultimap.create(); - for (DepAndRdep depAndRdep : pendingKeysToVisit) { - Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(depAndRdep.rdep); - if (label != null) { - visitsByPackage.put(label.getPackageIdentifier(), depAndRdep); - } - } - - ImmutableList.Builder<Task> builder = ImmutableList.builder(); - - // A couple notes here: - // (i) ArrayListMultimap#values returns the values grouped by key, which is exactly what we - // want. - // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid - // accidentally retaining the entire ArrayListMultimap object. - for (Iterable<DepAndRdep> depAndRdepBatch : - Iterables.partition(ImmutableList.copyOf(visitsByPackage.values()), VISIT_BATCH_SIZE)) { - builder.add(new VisitTask(depAndRdepBatch)); - } - - return builder.build(); + protected SkyKey getRdepOfVisit(DepAndRdep visit) { + return visit.rdep; } @Override @@ -396,7 +511,7 @@ public class ParallelSkyQueryUtils { // Still, we include an implementation of 'getUniqueValues' that is correct in isolation so as // to not be depending on implementation details of 'ParallelVisitor'. // - // Even so, there's value in not visiting a rdep if it's already been visiting *validly* + // Even so, there's value in not visiting a rdep if it's already been visited *validly* // before. We use the intentionally racy {@link Uniquifier#uniquePure} to attempt to do this. return depAndRdepUniquifier.unique( Iterables.filter( @@ -404,5 +519,278 @@ public class ParallelSkyQueryUtils { depAndRdep -> validRdepUniquifier.uniquePure(depAndRdep.rdep))); } } + + private static class DepAndRdepAtDepth { + private final DepAndRdep depAndRdep; + private final int rdepDepth; + + private DepAndRdepAtDepth(DepAndRdep depAndRdep, int rdepDepth) { + this.depAndRdep = depAndRdep; + this.rdepDepth = rdepDepth; + } + } + + /** + * A helper class that computes bounded 'allrdeps(<expr>, <depth>)' or + * 'rdeps(<precomputed-universe>, <expr>, <depth>)' via BFS. + * + * <p>This is very similar to {@link RdepsUnboundedVisitor}. A lot of the same concerns apply here + * but there are additional subtle concerns about the correctness of the bounded traversal: just + * like for the sequential implementation of bounded allrdeps, we use {@link MinDepthUniquifier}. + */ + private static class RdepsBoundedVisitor extends AbstractRdepsVisitor<DepAndRdepAtDepth> { + private final int depth; + private final Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier; + private final MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier; + private final Predicate<SkyKey> universe; + + private RdepsBoundedVisitor( + SkyQueryEnvironment env, + int depth, + Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier, + MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier, + Predicate<SkyKey> universe, + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore) { + super(env, callback, packageSemaphore); + this.depth = depth; + this.depAndRdepAtDepthUniquifier = depAndRdepAtDepthUniquifier; + this.validRdepMinDepthUniquifier = validRdepMinDepthUniquifier; + this.universe = universe; + } + + private static class Factory implements ParallelVisitor.Factory { + private final SkyQueryEnvironment env; + private final int depth; + private final Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier; + private final MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier; + private final Predicate<SkyKey> universe; + private final Callback<Target> callback; + private final MultisetSemaphore<PackageIdentifier> packageSemaphore; + + private Factory( + SkyQueryEnvironment env, + int depth, + Predicate<SkyKey> universe, + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore) { + this.env = env; + this.depth = depth; + this.universe = universe; + this.depAndRdepAtDepthUniquifier = + new UniquifierImpl<>(depAndRdepAtDepth -> depAndRdepAtDepth); + this.validRdepMinDepthUniquifier = env.createMinDepthSkyKeyUniquifier(); + this.callback = callback; + this.packageSemaphore = packageSemaphore; + } + + @Override + public ParallelVisitor<DepAndRdepAtDepth, Target> create() { + return new RdepsBoundedVisitor( + env, + depth, + depAndRdepAtDepthUniquifier, + validRdepMinDepthUniquifier, + universe, + callback, + packageSemaphore); + } + } + + @Override + protected Visit getVisitResult(Iterable<DepAndRdepAtDepth> depAndRdepAtDepths) + throws InterruptedException { + Map<SkyKey, Integer> shallowestRdepDepthMap = new HashMap<>(); + depAndRdepAtDepths.forEach( + depAndRdepAtDepth -> shallowestRdepDepthMap.merge( + depAndRdepAtDepth.depAndRdep.rdep, depAndRdepAtDepth.rdepDepth, Integer::min)); + + Collection<SkyKey> validRdeps = new ArrayList<>(); + + // Multimap of dep to all the reverse deps in this visitation. Used to filter out the + // disallowed deps. + Multimap<SkyKey, SkyKey> reverseDepMultimap = ArrayListMultimap.create(); + for (DepAndRdepAtDepth depAndRdepAtDepth : depAndRdepAtDepths) { + // The "roots" of our visitation (see #preprocessInitialVisit) have a null 'dep' field. + if (depAndRdepAtDepth.depAndRdep.dep == null) { + validRdeps.add(depAndRdepAtDepth.depAndRdep.rdep); + } else { + reverseDepMultimap.put( + depAndRdepAtDepth.depAndRdep.dep, depAndRdepAtDepth.depAndRdep.rdep); + } + } + + Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap = + env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepMultimap.values())); + Set<PackageIdentifier> pkgIdsNeededForTargetification = + packageKeyToTargetKeyMap + .keySet() + .stream() + .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER) + .collect(toImmutableSet()); + packageSemaphore.acquireAll(pkgIdsNeededForTargetification); + + try { + // Filter out disallowed deps. We cannot defer the targetification any further as we do not + // want to retrieve the rdeps of unwanted nodes (targets). + if (!reverseDepMultimap.isEmpty()) { + Collection<Target> filteredTargets = + env.filterRawReverseDepsOfTransitiveTraversalKeys( + reverseDepMultimap.asMap(), packageKeyToTargetKeyMap); + filteredTargets + .stream() + .map(SkyQueryEnvironment.TARGET_TO_SKY_KEY) + .forEachOrdered(validRdeps::add); + } + } finally { + packageSemaphore.releaseAll(pkgIdsNeededForTargetification); + } + + ImmutableList<SkyKey> uniqueValidRdeps = validRdeps.stream() + .filter(validRdep -> validRdepMinDepthUniquifier.uniqueAtDepthLessThanOrEqualTo( + validRdep, shallowestRdepDepthMap.get(validRdep))) + .collect(ImmutableList.toImmutableList()); + + // Don't bother getting the rdeps of the rdeps that are already at the depth bound. + Iterable<SkyKey> uniqueValidRdepsBelowDepthBound = Iterables.filter( + uniqueValidRdeps, + uniqueValidRdep -> shallowestRdepDepthMap.get(uniqueValidRdep) < depth); + + // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next + // recursive visitation. + Map<SkyKey, Iterable<SkyKey>> unfilteredRdepsOfRdeps = + env.graph.getReverseDeps(uniqueValidRdepsBelowDepthBound); + + ImmutableList.Builder<DepAndRdepAtDepth> depAndRdepAtDepthsToVisitBuilder = + ImmutableList.builder(); + unfilteredRdepsOfRdeps.entrySet().forEach(entry -> { + SkyKey rdep = entry.getKey(); + int depthOfRdepOfRdep = shallowestRdepDepthMap.get(rdep) + 1; + Streams.stream(entry.getValue()) + .filter(Predicates.and(SkyQueryEnvironment.IS_TTV, universe)) + .forEachOrdered(rdepOfRdep -> { + depAndRdepAtDepthsToVisitBuilder.add( + new DepAndRdepAtDepth(new DepAndRdep(rdep, rdepOfRdep), depthOfRdepOfRdep)); + }); + }); + + return new Visit( + /*keysToUseForResult=*/ uniqueValidRdeps, + /*keysToVisit=*/ depAndRdepAtDepthsToVisitBuilder.build()); + } + + @Override + protected Iterable<DepAndRdepAtDepth> preprocessInitialVisit(Iterable<SkyKey> keys) { + return Iterables.transform( + Iterables.filter(keys, k -> universe.apply(k)), + key -> new DepAndRdepAtDepth(new DepAndRdep(null, key), 0)); + } + + @Override + protected SkyKey getRdepOfVisit(DepAndRdepAtDepth visit) { + return visit.depAndRdep.rdep; + } + + @Override + protected ImmutableList<DepAndRdepAtDepth> getUniqueValues( + Iterable<DepAndRdepAtDepth> depAndRdepAtDepths) { + // See the comment in RdepsUnboundedVisitor#getUniqueValues. + return depAndRdepAtDepthUniquifier.unique( + Iterables.filter( + depAndRdepAtDepths, + depAndRdepAtDepth -> validRdepMinDepthUniquifier.uniqueAtDepthLessThanOrEqualToPure( + depAndRdepAtDepth.depAndRdep.rdep, depAndRdepAtDepth.rdepDepth))); + } + } + + /** Helper class that computes DTC in the form of {@link SkyKey} via BFS. */ + // TODO(nharmata): This should only be for the TTV-land DTC (i.e. only follow TTV -> TTV edges). + private static class TransitiveClosureVisitor extends ParallelVisitor<SkyKey, SkyKey> { + private final SkyQueryEnvironment env; + private final Uniquifier<SkyKey> uniquifier; + + private TransitiveClosureVisitor( + SkyQueryEnvironment env, + Uniquifier<SkyKey> uniquifier, + int processResultsBatchSize, + AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback) { + super(aggregateAllCallback, VISIT_BATCH_SIZE, processResultsBatchSize); + this.env = env; + this.uniquifier = uniquifier; + } + + private static class Factory implements ParallelVisitor.Factory { + private final SkyQueryEnvironment env; + private final Uniquifier<SkyKey> uniquifier; + private final AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback; + private final int processResultsBatchSize; + + private Factory( + SkyQueryEnvironment env, + Uniquifier<SkyKey> uniquifier, + int processResultsBatchSize, + AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback) { + this.env = env; + this.uniquifier = uniquifier; + this.processResultsBatchSize = processResultsBatchSize; + this.aggregateAllCallback = aggregateAllCallback; + } + + @Override + public ParallelVisitor<SkyKey, SkyKey> create() { + return new TransitiveClosureVisitor( + env, uniquifier, processResultsBatchSize, aggregateAllCallback); + } + } + + @Override + protected void processPartialResults( + Iterable<SkyKey> keysToUseForResult, Callback<SkyKey> callback) + throws QueryException, InterruptedException { + callback.process(keysToUseForResult); + } + + @Override + protected Visit getVisitResult(Iterable<SkyKey> values) throws InterruptedException { + Multimap<SkyKey, SkyKey> deps = env.getDirectDepsOfSkyKeys(values); + return new Visit( + /*keysToUseForResult=*/ deps.keySet(), + /*keysToVisit=*/ ImmutableSet.copyOf(deps.values())); + } + + @Override + protected Iterable<SkyKey> preprocessInitialVisit(Iterable<SkyKey> keys) { + return keys; + } + + @Override + protected ImmutableList<SkyKey> getUniqueValues(Iterable<SkyKey> values) { + return uniquifier.unique(values); + } + } + + /** Thread-safe {@link AggregateAllCallback} backed by a concurrent {@link Set}. */ + @ThreadSafe + private static class ThreadSafeAggregateAllSkyKeysCallback + implements AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> { + + private final Set<SkyKey> results; + + private ThreadSafeAggregateAllSkyKeysCallback(int concurrencyLevel) { + this.results = + Collections.newSetFromMap(new MapMaker().concurrencyLevel(concurrencyLevel).makeMap()); + } + + @Override + public void process(Iterable<SkyKey> partialResult) + throws QueryException, InterruptedException { + Iterables.addAll(results, partialResult); + } + + @Override + public ImmutableSet<SkyKey> getResult() { + return ImmutableSet.copyOf(results); + } + } } |