diff options
author | nharmata <nharmata@google.com> | 2018-04-12 15:31:26 -0700 |
---|---|---|
committer | Copybara-Service <copybara-piper@google.com> | 2018-04-12 15:33:05 -0700 |
commit | 398e6dab1092740e38a4ff8657a2f8dee9ee7c20 (patch) | |
tree | 09da67203e0d378c015f785958c5c9f7f7dfe813 /src/main/java/com/google/devtools/build/lib/query2 | |
parent | 06cb84dcf9a0decdd8ff74464137ccd48eb9646e (diff) |
Provide parallel implementations of bounded allrdeps and rdeps.
RELNOTES: None
PiperOrigin-RevId: 192681579
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2')
8 files changed, 695 insertions, 352 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java index 585b39d695..e0a09851fa 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java @@ -16,21 +16,31 @@ package com.google.devtools.build.lib.query2; import static com.google.common.collect.ImmutableSet.toImmutableSet; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.ListMultimap; -import com.google.common.collect.Maps; +import com.google.common.collect.MapMaker; import com.google.common.collect.Multimap; +import com.google.common.collect.Streams; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.collect.compacthashset.CompactHashSet; import com.google.devtools.build.lib.concurrent.MultisetSemaphore; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.query2.engine.Callback; +import com.google.devtools.build.lib.query2.engine.MinDepthUniquifier; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ThreadSafeMutableSet; import com.google.devtools.build.lib.query2.engine.QueryException; import com.google.devtools.build.lib.query2.engine.QueryExpression; +import com.google.devtools.build.lib.query2.engine.QueryUtil; +import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback; import com.google.devtools.build.lib.query2.engine.QueryUtil.UniquifierImpl; import com.google.devtools.build.lib.query2.engine.Uniquifier; import com.google.devtools.build.lib.query2.engine.VariableContext; @@ -40,7 +50,8 @@ import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.skyframe.SkyKey; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -63,10 +74,6 @@ public class ParallelSkyQueryUtils { private ParallelSkyQueryUtils() { } - /** - * Specialized parallel variant of {@link SkyQueryEnvironment#getAllRdeps} that is appropriate - * when there is no depth-bound. - */ static QueryTaskFuture<Void> getAllRdepsUnboundedParallel( SkyQueryEnvironment env, QueryExpression expression, @@ -77,7 +84,95 @@ public class ParallelSkyQueryUtils { expression, context, ParallelVisitor.createParallelVisitorCallback( - new AllRdepsUnboundedVisitor.Factory(env, callback, packageSemaphore))); + new RdepsUnboundedVisitor.Factory( + env, + /*universe=*/ Predicates.alwaysTrue(), + callback, + packageSemaphore))); + } + + static QueryTaskFuture<Void> getAllRdepsBoundedParallel( + SkyQueryEnvironment env, + QueryExpression expression, + int depth, + VariableContext<Target> context, + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore) { + return env.eval( + expression, + context, + ParallelVisitor.createParallelVisitorCallback( + new RdepsBoundedVisitor.Factory( + env, + depth, + /*universe=*/ Predicates.alwaysTrue(), + callback, + packageSemaphore))); + } + + static QueryTaskFuture<Void> getRdepsInUniverseUnboundedParallel( + SkyQueryEnvironment env, + QueryExpression expression, + Predicate<SkyKey> universe, + VariableContext<Target> context, + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore) { + return env.eval( + expression, + context, + ParallelVisitor.createParallelVisitorCallback( + new RdepsUnboundedVisitor.Factory(env, universe, callback, packageSemaphore))); + } + + static QueryTaskFuture<Predicate<SkyKey>> getDTCSkyKeyPredicateFuture( + SkyQueryEnvironment env, + QueryExpression expression, + VariableContext<Target> context, + int processResultsBatchSize, + int concurrencyLevel) { + QueryTaskFuture<ThreadSafeMutableSet<Target>> universeValueFuture = + QueryUtil.evalAll(env, context, expression); + + Function<ThreadSafeMutableSet<Target>, QueryTaskFuture<Predicate<SkyKey>>> + getTransitiveClosureAsyncFunction = + universeValue -> { + ThreadSafeAggregateAllSkyKeysCallback aggregateAllCallback = + new ThreadSafeAggregateAllSkyKeysCallback(concurrencyLevel); + return env.executeAsync( + () -> { + Callback<Target> visitorCallback = + ParallelVisitor.createParallelVisitorCallback( + new TransitiveClosureVisitor.Factory( + env, + env.createSkyKeyUniquifier(), + processResultsBatchSize, + aggregateAllCallback)); + visitorCallback.process(universeValue); + return Predicates.in(aggregateAllCallback.getResult()); + }); + }; + + return env.transformAsync(universeValueFuture, getTransitiveClosureAsyncFunction); + } + + static QueryTaskFuture<Void> getRdepsInUniverseBoundedParallel( + SkyQueryEnvironment env, + QueryExpression expression, + int depth, + Predicate<SkyKey> universe, + VariableContext<Target> context, + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore) { + return env.eval( + expression, + context, + ParallelVisitor.createParallelVisitorCallback( + new RdepsBoundedVisitor.Factory( + env, + depth, + universe, + callback, + packageSemaphore))); } /** Specialized parallel variant of {@link SkyQueryEnvironment#getRBuildFiles}. */ @@ -191,8 +286,75 @@ public class ParallelSkyQueryUtils { } } + private abstract static class AbstractRdepsVisitor<T> extends ParallelVisitor<T, Target> { + private static final int PROCESS_RESULTS_BATCH_SIZE = SkyQueryEnvironment.BATCH_CALLBACK_SIZE; + + protected final SkyQueryEnvironment env; + protected final MultisetSemaphore<PackageIdentifier> packageSemaphore; + + protected AbstractRdepsVisitor( + SkyQueryEnvironment env, + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore) { + super(callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE); + this.env = env; + this.packageSemaphore = packageSemaphore; + } + + @Override + protected void processPartialResults( + Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) + throws QueryException, InterruptedException { + Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap = + env.makePackageKeyToTargetKeyMap(keysToUseForResult); + Set<PackageIdentifier> pkgIdsNeededForResult = + packageKeyToTargetKeyMap + .keySet() + .stream() + .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER) + .collect(toImmutableSet()); + packageSemaphore.acquireAll(pkgIdsNeededForResult); + try { + callback.process( + env.makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap).values()); + } finally { + packageSemaphore.releaseAll(pkgIdsNeededForResult); + } + } + + protected abstract SkyKey getRdepOfVisit(T visit); + + @Override + protected Iterable<Task> getVisitTasks(Collection<T> pendingVisits) { + // Group pending visitation by the package of the rdep, since we'll be targetfying the + // rdep during the visitation. + ListMultimap<PackageIdentifier, T> visitsByPackage = ArrayListMultimap.create(); + for (T visit : pendingVisits) { + Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(getRdepOfVisit(visit)); + if (label != null) { + visitsByPackage.put(label.getPackageIdentifier(), visit); + } + } + + ImmutableList.Builder<Task> builder = ImmutableList.builder(); + + // A couple notes here: + // (i) ArrayListMultimap#values returns the values grouped by key, which is exactly what we + // want. + // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid + // accidentally retaining the entire ArrayListMultimap object. + for (Iterable<T> visitBatch : + Iterables.partition(ImmutableList.copyOf(visitsByPackage.values()), VISIT_BATCH_SIZE)) { + builder.add(new VisitTask(visitBatch)); + } + + return builder.build(); + } + } + /** - * A helper class that computes 'allrdeps(<blah>)' via BFS. + * A helper class that computes unbounded 'allrdeps(<expr>)' or + * 'rdeps(<precomputed-universe>, <expr>)' via BFS. * * <p>The visitor uses {@link DepAndRdep} to keep track the nodes to visit and avoid dealing with * targetification of reverse deps until they are needed. The rdep node itself is needed to filter @@ -202,10 +364,7 @@ public class ParallelSkyQueryUtils { * risk of OOMs. The additional memory usage should not be a large concern here, as even with 10M * edges, the memory overhead is around 160M, and the memory can be reclaimed by regular GC. */ - private static class AllRdepsUnboundedVisitor extends ParallelVisitor<DepAndRdep, Target> { - private static final int PROCESS_RESULTS_BATCH_SIZE = SkyQueryEnvironment.BATCH_CALLBACK_SIZE; - private final SkyQueryEnvironment env; - private final MultisetSemaphore<PackageIdentifier> packageSemaphore; + private static class RdepsUnboundedVisitor extends AbstractRdepsVisitor<DepAndRdep> { /** * A {@link Uniquifier} for visitations. Solely used for {@link #getUniqueValues}, which * actually isn't that useful. See the method javadoc. @@ -215,25 +374,26 @@ public class ParallelSkyQueryUtils { * A {@link Uniquifier} for *valid* visitations of rdeps. {@code env}'s dependency filter might * mean that some rdep edges are invalid, meaning that any individual {@link DepAndRdep} * visitation may actually be invalid. Because the same rdep can be reached through more than - * one reverse edge, It'd be incorrectly to naively dedupe visitations solely based on the rdep. + * one reverse edge, it'd be incorrect to naively dedupe visitations solely based on the rdep. */ private final Uniquifier<SkyKey> validRdepUniquifier; + private final Predicate<SkyKey> universe; - private AllRdepsUnboundedVisitor( + private RdepsUnboundedVisitor( SkyQueryEnvironment env, Uniquifier<DepAndRdep> depAndRdepUniquifier, Uniquifier<SkyKey> validRdepUniquifier, + Predicate<SkyKey> universe, Callback<Target> callback, MultisetSemaphore<PackageIdentifier> packageSemaphore) { - super(callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE); - this.env = env; + super(env, callback, packageSemaphore); this.depAndRdepUniquifier = depAndRdepUniquifier; this.validRdepUniquifier = validRdepUniquifier; - this.packageSemaphore = packageSemaphore; + this.universe = universe; } /** - * A {@link Factory} for {@link AllRdepsUnboundedVisitor} instances, each of which will be used + * A {@link Factory} for {@link RdepsUnboundedVisitor} instances, each of which will be used * to perform visitation of the reverse transitive closure of the {@link Target}s passed in a * single {@link Callback#process} call. Note that all the created instances share the same * {@link Uniquifier} so that we don't visit the same Skyframe node more than once. @@ -242,14 +402,17 @@ public class ParallelSkyQueryUtils { private final SkyQueryEnvironment env; private final Uniquifier<DepAndRdep> depAndRdepUniquifier; private final Uniquifier<SkyKey> validRdepUniquifier; + private final Predicate<SkyKey> universe; private final Callback<Target> callback; private final MultisetSemaphore<PackageIdentifier> packageSemaphore; private Factory( SkyQueryEnvironment env, + Predicate<SkyKey> universe, Callback<Target> callback, MultisetSemaphore<PackageIdentifier> packageSemaphore) { this.env = env; + this.universe = universe; this.depAndRdepUniquifier = new UniquifierImpl<>(depAndRdep -> depAndRdep); this.validRdepUniquifier = env.createSkyKeyUniquifier(); this.callback = callback; @@ -258,31 +421,29 @@ public class ParallelSkyQueryUtils { @Override public ParallelVisitor<DepAndRdep, Target> create() { - return new AllRdepsUnboundedVisitor( - env, depAndRdepUniquifier, validRdepUniquifier, callback, packageSemaphore); + return new RdepsUnboundedVisitor( + env, depAndRdepUniquifier, validRdepUniquifier, universe, callback, packageSemaphore); } } @Override protected Visit getVisitResult(Iterable<DepAndRdep> depAndRdeps) throws InterruptedException { - Collection<SkyKey> filteredUniqueKeys = new ArrayList<>(); + Collection<SkyKey> validRdeps = new ArrayList<>(); - // Build a raw reverse dep map from pairs of SkyKeys to filter out the disallowed deps. - Map<SkyKey, Collection<SkyKey>> reverseDepsMap = Maps.newHashMap(); + // Multimap of dep to all the reverse deps in this visitation. Used to filter out the + // disallowed deps. + Multimap<SkyKey, SkyKey> reverseDepMultimap = ArrayListMultimap.create(); for (DepAndRdep depAndRdep : depAndRdeps) { // The "roots" of our visitation (see #preprocessInitialVisit) have a null 'dep' field. - if (depAndRdep.dep == null && validRdepUniquifier.unique(depAndRdep.rdep)) { - filteredUniqueKeys.add(depAndRdep.rdep); - continue; + if (depAndRdep.dep == null) { + validRdeps.add(depAndRdep.rdep); + } else { + reverseDepMultimap.put(depAndRdep.dep, depAndRdep.rdep); } - - reverseDepsMap.computeIfAbsent(depAndRdep.dep, k -> new LinkedList<SkyKey>()); - - reverseDepsMap.get(depAndRdep.dep).add(depAndRdep.rdep); } Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap = - env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepsMap.values())); + env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepMultimap.values())); Set<PackageIdentifier> pkgIdsNeededForTargetification = packageKeyToTargetKeyMap .keySet() @@ -294,95 +455,49 @@ public class ParallelSkyQueryUtils { try { // Filter out disallowed deps. We cannot defer the targetification any further as we do not // want to retrieve the rdeps of unwanted nodes (targets). - if (!reverseDepsMap.isEmpty()) { + if (!reverseDepMultimap.isEmpty()) { Collection<Target> filteredTargets = env.filterRawReverseDepsOfTransitiveTraversalKeys( - reverseDepsMap, packageKeyToTargetKeyMap); + reverseDepMultimap.asMap(), packageKeyToTargetKeyMap); filteredTargets .stream() .map(SkyQueryEnvironment.TARGET_TO_SKY_KEY) - .forEachOrdered( - rdep -> { - if (validRdepUniquifier.unique(rdep)) { - filteredUniqueKeys.add(rdep); - } - }); - } + .forEachOrdered(validRdeps::add); + } } finally { packageSemaphore.releaseAll(pkgIdsNeededForTargetification); } + ImmutableList<SkyKey> uniqueValidRdeps = validRdeps.stream() + .filter(validRdepUniquifier::unique) + .collect(ImmutableList.toImmutableList()); + // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next // recursive visitation. - Map<SkyKey, Iterable<SkyKey>> unfilteredReverseDeps = - env.graph.getReverseDeps(filteredUniqueKeys); - - ImmutableList.Builder<DepAndRdep> builder = ImmutableList.builder(); - for (Map.Entry<SkyKey, Iterable<SkyKey>> rdeps : unfilteredReverseDeps.entrySet()) { - for (SkyKey rdep : rdeps.getValue()) { - Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(rdep); - if (label != null) { - builder.add(new DepAndRdep(rdeps.getKey(), rdep)); - } - } - } + ImmutableList.Builder<DepAndRdep> depAndRdepsToVisitBuilder = ImmutableList.builder(); + env.graph.getReverseDeps(uniqueValidRdeps).entrySet() + .forEach(reverseDepsEntry -> depAndRdepsToVisitBuilder.addAll( + Iterables.transform( + Iterables.filter( + reverseDepsEntry.getValue(), + Predicates.and(SkyQueryEnvironment.IS_TTV, universe)), + rdep -> new DepAndRdep(reverseDepsEntry.getKey(), rdep)))); return new Visit( - /*keysToUseForResult=*/ filteredUniqueKeys, /*keysToVisit=*/ builder.build()); - } - - @Override - protected void processPartialResults( - Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) - throws QueryException, InterruptedException { - Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap = - env.makePackageKeyToTargetKeyMap(keysToUseForResult); - Set<PackageIdentifier> pkgIdsNeededForResult = - packageKeyToTargetKeyMap - .keySet() - .stream() - .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER) - .collect(toImmutableSet()); - packageSemaphore.acquireAll(pkgIdsNeededForResult); - try { - callback.process( - env.makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap).values()); - } finally { - packageSemaphore.releaseAll(pkgIdsNeededForResult); - } + /*keysToUseForResult=*/ uniqueValidRdeps, + /*keysToVisit=*/ depAndRdepsToVisitBuilder.build()); } @Override protected Iterable<DepAndRdep> preprocessInitialVisit(Iterable<SkyKey> keys) { - return Iterables.transform(keys, key -> new DepAndRdep(null, key)); + return Iterables.transform( + Iterables.filter(keys, k -> universe.apply(k)), + key -> new DepAndRdep(null, key)); } @Override - protected Iterable<Task> getVisitTasks(Collection<DepAndRdep> pendingKeysToVisit) { - // Group pending (dep, rdep) visits by the package of the rdep, since we'll be targetfying the - // rdep during the visitation. - ListMultimap<PackageIdentifier, DepAndRdep> visitsByPackage = - ArrayListMultimap.create(); - for (DepAndRdep depAndRdep : pendingKeysToVisit) { - Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(depAndRdep.rdep); - if (label != null) { - visitsByPackage.put(label.getPackageIdentifier(), depAndRdep); - } - } - - ImmutableList.Builder<Task> builder = ImmutableList.builder(); - - // A couple notes here: - // (i) ArrayListMultimap#values returns the values grouped by key, which is exactly what we - // want. - // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid - // accidentally retaining the entire ArrayListMultimap object. - for (Iterable<DepAndRdep> depAndRdepBatch : - Iterables.partition(ImmutableList.copyOf(visitsByPackage.values()), VISIT_BATCH_SIZE)) { - builder.add(new VisitTask(depAndRdepBatch)); - } - - return builder.build(); + protected SkyKey getRdepOfVisit(DepAndRdep visit) { + return visit.rdep; } @Override @@ -396,7 +511,7 @@ public class ParallelSkyQueryUtils { // Still, we include an implementation of 'getUniqueValues' that is correct in isolation so as // to not be depending on implementation details of 'ParallelVisitor'. // - // Even so, there's value in not visiting a rdep if it's already been visiting *validly* + // Even so, there's value in not visiting a rdep if it's already been visited *validly* // before. We use the intentionally racy {@link Uniquifier#uniquePure} to attempt to do this. return depAndRdepUniquifier.unique( Iterables.filter( @@ -404,5 +519,278 @@ public class ParallelSkyQueryUtils { depAndRdep -> validRdepUniquifier.uniquePure(depAndRdep.rdep))); } } + + private static class DepAndRdepAtDepth { + private final DepAndRdep depAndRdep; + private final int rdepDepth; + + private DepAndRdepAtDepth(DepAndRdep depAndRdep, int rdepDepth) { + this.depAndRdep = depAndRdep; + this.rdepDepth = rdepDepth; + } + } + + /** + * A helper class that computes bounded 'allrdeps(<expr>, <depth>)' or + * 'rdeps(<precomputed-universe>, <expr>, <depth>)' via BFS. + * + * <p>This is very similar to {@link RdepsUnboundedVisitor}. A lot of the same concerns apply here + * but there are additional subtle concerns about the correctness of the bounded traversal: just + * like for the sequential implementation of bounded allrdeps, we use {@link MinDepthUniquifier}. + */ + private static class RdepsBoundedVisitor extends AbstractRdepsVisitor<DepAndRdepAtDepth> { + private final int depth; + private final Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier; + private final MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier; + private final Predicate<SkyKey> universe; + + private RdepsBoundedVisitor( + SkyQueryEnvironment env, + int depth, + Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier, + MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier, + Predicate<SkyKey> universe, + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore) { + super(env, callback, packageSemaphore); + this.depth = depth; + this.depAndRdepAtDepthUniquifier = depAndRdepAtDepthUniquifier; + this.validRdepMinDepthUniquifier = validRdepMinDepthUniquifier; + this.universe = universe; + } + + private static class Factory implements ParallelVisitor.Factory { + private final SkyQueryEnvironment env; + private final int depth; + private final Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier; + private final MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier; + private final Predicate<SkyKey> universe; + private final Callback<Target> callback; + private final MultisetSemaphore<PackageIdentifier> packageSemaphore; + + private Factory( + SkyQueryEnvironment env, + int depth, + Predicate<SkyKey> universe, + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore) { + this.env = env; + this.depth = depth; + this.universe = universe; + this.depAndRdepAtDepthUniquifier = + new UniquifierImpl<>(depAndRdepAtDepth -> depAndRdepAtDepth); + this.validRdepMinDepthUniquifier = env.createMinDepthSkyKeyUniquifier(); + this.callback = callback; + this.packageSemaphore = packageSemaphore; + } + + @Override + public ParallelVisitor<DepAndRdepAtDepth, Target> create() { + return new RdepsBoundedVisitor( + env, + depth, + depAndRdepAtDepthUniquifier, + validRdepMinDepthUniquifier, + universe, + callback, + packageSemaphore); + } + } + + @Override + protected Visit getVisitResult(Iterable<DepAndRdepAtDepth> depAndRdepAtDepths) + throws InterruptedException { + Map<SkyKey, Integer> shallowestRdepDepthMap = new HashMap<>(); + depAndRdepAtDepths.forEach( + depAndRdepAtDepth -> shallowestRdepDepthMap.merge( + depAndRdepAtDepth.depAndRdep.rdep, depAndRdepAtDepth.rdepDepth, Integer::min)); + + Collection<SkyKey> validRdeps = new ArrayList<>(); + + // Multimap of dep to all the reverse deps in this visitation. Used to filter out the + // disallowed deps. + Multimap<SkyKey, SkyKey> reverseDepMultimap = ArrayListMultimap.create(); + for (DepAndRdepAtDepth depAndRdepAtDepth : depAndRdepAtDepths) { + // The "roots" of our visitation (see #preprocessInitialVisit) have a null 'dep' field. + if (depAndRdepAtDepth.depAndRdep.dep == null) { + validRdeps.add(depAndRdepAtDepth.depAndRdep.rdep); + } else { + reverseDepMultimap.put( + depAndRdepAtDepth.depAndRdep.dep, depAndRdepAtDepth.depAndRdep.rdep); + } + } + + Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap = + env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepMultimap.values())); + Set<PackageIdentifier> pkgIdsNeededForTargetification = + packageKeyToTargetKeyMap + .keySet() + .stream() + .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER) + .collect(toImmutableSet()); + packageSemaphore.acquireAll(pkgIdsNeededForTargetification); + + try { + // Filter out disallowed deps. We cannot defer the targetification any further as we do not + // want to retrieve the rdeps of unwanted nodes (targets). + if (!reverseDepMultimap.isEmpty()) { + Collection<Target> filteredTargets = + env.filterRawReverseDepsOfTransitiveTraversalKeys( + reverseDepMultimap.asMap(), packageKeyToTargetKeyMap); + filteredTargets + .stream() + .map(SkyQueryEnvironment.TARGET_TO_SKY_KEY) + .forEachOrdered(validRdeps::add); + } + } finally { + packageSemaphore.releaseAll(pkgIdsNeededForTargetification); + } + + ImmutableList<SkyKey> uniqueValidRdeps = validRdeps.stream() + .filter(validRdep -> validRdepMinDepthUniquifier.uniqueAtDepthLessThanOrEqualTo( + validRdep, shallowestRdepDepthMap.get(validRdep))) + .collect(ImmutableList.toImmutableList()); + + // Don't bother getting the rdeps of the rdeps that are already at the depth bound. + Iterable<SkyKey> uniqueValidRdepsBelowDepthBound = Iterables.filter( + uniqueValidRdeps, + uniqueValidRdep -> shallowestRdepDepthMap.get(uniqueValidRdep) < depth); + + // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next + // recursive visitation. + Map<SkyKey, Iterable<SkyKey>> unfilteredRdepsOfRdeps = + env.graph.getReverseDeps(uniqueValidRdepsBelowDepthBound); + + ImmutableList.Builder<DepAndRdepAtDepth> depAndRdepAtDepthsToVisitBuilder = + ImmutableList.builder(); + unfilteredRdepsOfRdeps.entrySet().forEach(entry -> { + SkyKey rdep = entry.getKey(); + int depthOfRdepOfRdep = shallowestRdepDepthMap.get(rdep) + 1; + Streams.stream(entry.getValue()) + .filter(Predicates.and(SkyQueryEnvironment.IS_TTV, universe)) + .forEachOrdered(rdepOfRdep -> { + depAndRdepAtDepthsToVisitBuilder.add( + new DepAndRdepAtDepth(new DepAndRdep(rdep, rdepOfRdep), depthOfRdepOfRdep)); + }); + }); + + return new Visit( + /*keysToUseForResult=*/ uniqueValidRdeps, + /*keysToVisit=*/ depAndRdepAtDepthsToVisitBuilder.build()); + } + + @Override + protected Iterable<DepAndRdepAtDepth> preprocessInitialVisit(Iterable<SkyKey> keys) { + return Iterables.transform( + Iterables.filter(keys, k -> universe.apply(k)), + key -> new DepAndRdepAtDepth(new DepAndRdep(null, key), 0)); + } + + @Override + protected SkyKey getRdepOfVisit(DepAndRdepAtDepth visit) { + return visit.depAndRdep.rdep; + } + + @Override + protected ImmutableList<DepAndRdepAtDepth> getUniqueValues( + Iterable<DepAndRdepAtDepth> depAndRdepAtDepths) { + // See the comment in RdepsUnboundedVisitor#getUniqueValues. + return depAndRdepAtDepthUniquifier.unique( + Iterables.filter( + depAndRdepAtDepths, + depAndRdepAtDepth -> validRdepMinDepthUniquifier.uniqueAtDepthLessThanOrEqualToPure( + depAndRdepAtDepth.depAndRdep.rdep, depAndRdepAtDepth.rdepDepth))); + } + } + + /** Helper class that computes DTC in the form of {@link SkyKey} via BFS. */ + // TODO(nharmata): This should only be for the TTV-land DTC (i.e. only follow TTV -> TTV edges). + private static class TransitiveClosureVisitor extends ParallelVisitor<SkyKey, SkyKey> { + private final SkyQueryEnvironment env; + private final Uniquifier<SkyKey> uniquifier; + + private TransitiveClosureVisitor( + SkyQueryEnvironment env, + Uniquifier<SkyKey> uniquifier, + int processResultsBatchSize, + AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback) { + super(aggregateAllCallback, VISIT_BATCH_SIZE, processResultsBatchSize); + this.env = env; + this.uniquifier = uniquifier; + } + + private static class Factory implements ParallelVisitor.Factory { + private final SkyQueryEnvironment env; + private final Uniquifier<SkyKey> uniquifier; + private final AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback; + private final int processResultsBatchSize; + + private Factory( + SkyQueryEnvironment env, + Uniquifier<SkyKey> uniquifier, + int processResultsBatchSize, + AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback) { + this.env = env; + this.uniquifier = uniquifier; + this.processResultsBatchSize = processResultsBatchSize; + this.aggregateAllCallback = aggregateAllCallback; + } + + @Override + public ParallelVisitor<SkyKey, SkyKey> create() { + return new TransitiveClosureVisitor( + env, uniquifier, processResultsBatchSize, aggregateAllCallback); + } + } + + @Override + protected void processPartialResults( + Iterable<SkyKey> keysToUseForResult, Callback<SkyKey> callback) + throws QueryException, InterruptedException { + callback.process(keysToUseForResult); + } + + @Override + protected Visit getVisitResult(Iterable<SkyKey> values) throws InterruptedException { + Multimap<SkyKey, SkyKey> deps = env.getDirectDepsOfSkyKeys(values); + return new Visit( + /*keysToUseForResult=*/ deps.keySet(), + /*keysToVisit=*/ ImmutableSet.copyOf(deps.values())); + } + + @Override + protected Iterable<SkyKey> preprocessInitialVisit(Iterable<SkyKey> keys) { + return keys; + } + + @Override + protected ImmutableList<SkyKey> getUniqueValues(Iterable<SkyKey> values) { + return uniquifier.unique(values); + } + } + + /** Thread-safe {@link AggregateAllCallback} backed by a concurrent {@link Set}. */ + @ThreadSafe + private static class ThreadSafeAggregateAllSkyKeysCallback + implements AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> { + + private final Set<SkyKey> results; + + private ThreadSafeAggregateAllSkyKeysCallback(int concurrencyLevel) { + this.results = + Collections.newSetFromMap(new MapMaker().concurrencyLevel(concurrencyLevel).makeMap()); + } + + @Override + public void process(Iterable<SkyKey> partialResult) + throws QueryException, InterruptedException { + Iterables.addAll(results, partialResult); + } + + @Override + public ImmutableSet<SkyKey> getResult() { + return ImmutableSet.copyOf(results); + } + } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java index bd20259da4..3ea72273a8 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java @@ -15,7 +15,6 @@ package com.google.devtools.build.lib.query2; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.collect.Streams; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; import com.google.devtools.build.lib.concurrent.BlockingStack; @@ -153,8 +152,7 @@ public abstract class ParallelVisitor<T, V> { void visitAndWaitForCompletion(Iterable<SkyKey> keys) throws QueryException, InterruptedException { - Streams.stream(getUniqueValues(preprocessInitialVisit(keys))).forEachOrdered( - processingQueue::add); + getUniqueValues(preprocessInitialVisit(keys)).forEach(processingQueue::add); executor.visitAndWaitForCompletion(); } diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index 8b95172cc4..b13ed4750c 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -16,7 +16,6 @@ package com.google.devtools.build.lib.query2; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Ascii; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -27,6 +26,7 @@ import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet.Builder; import com.google.common.collect.Iterables; @@ -112,11 +112,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Queue; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.RejectedExecutionException; @@ -506,6 +504,21 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> return result; } + /** + * Returns deps in the form of {@link SkyKey}s. + * + * <p>The implementation of this method does not filter deps, therefore it is expected to be used + * only when {@link SkyQueryEnvironment#dependencyFilter} is set to {@link + * DependencyFilter#ALL_DEPS}. + */ + Multimap<SkyKey, SkyKey> getDirectDepsOfSkyKeys(Iterable<SkyKey> keys) + throws InterruptedException { + Preconditions.checkState(dependencyFilter == DependencyFilter.ALL_DEPS, dependencyFilter); + ImmutableMultimap.Builder<SkyKey, SkyKey> builder = ImmutableMultimap.builder(); + graph.getDirectDeps(keys).forEach(builder::putAll); + return builder.build(); + } + @Override public Collection<Target> getReverseDeps(Iterable<Target> targets) throws InterruptedException { return getReverseDepsOfTransitiveTraversalKeys(Iterables.transform(targets, TARGET_TO_SKY_KEY)); @@ -648,6 +661,11 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> } @ThreadSafe + protected MinDepthUniquifier<SkyKey> createMinDepthSkyKeyUniquifier() { + return new MinDepthUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT); + } + + @ThreadSafe Uniquifier<Target> createTargetUniquifier() { return new UniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT); } @@ -894,15 +912,11 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> // so no preloading of target patterns is necessary. } + static final Predicate<SkyKey> IS_TTV = SkyFunctionName.functionIs(Label.TRANSITIVE_TRAVERSAL); + static final Function<SkyKey, Label> SKYKEY_TO_LABEL = - skyKey -> { - SkyFunctionName functionName = skyKey.functionName(); - if (!functionName.equals(Label.TRANSITIVE_TRAVERSAL)) { - // Skip non-targets. - return null; - } - return (Label) skyKey.argument(); - }; + skyKey -> IS_TTV.apply(skyKey) ? (Label) skyKey.argument() : null; + static final Function<SkyKey, PackageIdentifier> PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER = skyKey -> (PackageIdentifier) skyKey.argument(); @@ -1266,176 +1280,53 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> this, expression, context, callback, packageSemaphore); } + @ThreadSafe @Override - public QueryTaskFuture<Void> getRdepsUnboundedInUniverseParallel( + public QueryTaskFuture<Void> getAllRdepsBoundedParallel( QueryExpression expression, + int depth, VariableContext<Target> context, - List<Argument> args, Callback<Target> callback) { - return RdepsFunction.evalWithBoundedDepth(this, context, expression, args, callback); + return ParallelSkyQueryUtils.getAllRdepsBoundedParallel( + this, expression, depth, context, callback, packageSemaphore); + } + + protected QueryTaskFuture<Predicate<SkyKey>> getUniverseDTCSkyKeyPredicateFuture( + QueryExpression universe, + VariableContext<Target> context) { + return ParallelSkyQueryUtils.getDTCSkyKeyPredicateFuture( + this, + universe, + context, + BATCH_CALLBACK_SIZE, + DEFAULT_THREAD_COUNT); } @ThreadSafe @Override - public QueryTaskFuture<Void> getAllRdeps( + public QueryTaskFuture<Void> getRdepsUnboundedParallel( QueryExpression expression, - Predicate<Target> universe, + QueryExpression universe, VariableContext<Target> context, - Callback<Target> callback, - int depth) { - return getAllRdeps(expression, universe, context, callback, depth, BATCH_CALLBACK_SIZE); + Callback<Target> callback) { + return transformAsync( + getUniverseDTCSkyKeyPredicateFuture(universe, context), + universePredicate -> ParallelSkyQueryUtils.getRdepsInUniverseUnboundedParallel( + this, expression, universePredicate, context, callback, packageSemaphore)); } - /** - * Computes and applies the callback to the reverse dependencies of the expression. - * - * <p>Batch size is used to only populate at most N targets at one time, because some individual - * nodes are directly depended on by a large number of other nodes. - */ - @VisibleForTesting - protected QueryTaskFuture<Void> getAllRdeps( + @ThreadSafe + @Override + public QueryTaskFuture<Void> getRdepsBoundedParallel( QueryExpression expression, - Predicate<Target> universe, - VariableContext<Target> context, - Callback<Target> callback, int depth, - int batchSize) { - MinDepthUniquifier<Target> minDepthUniquifier = createMinDepthUniquifier(); - return eval( - expression, - context, - new BatchAllRdepsCallback(minDepthUniquifier, universe, callback, depth, batchSize)); - } - - private class BatchAllRdepsCallback implements Callback<Target> { - private final MinDepthUniquifier<Target> minDepthUniquifier; - private final Predicate<Target> universe; - private final Callback<Target> callback; - private final int depth; - private final int batchSize; - - private BatchAllRdepsCallback( - MinDepthUniquifier<Target> minDepthUniquifier, - Predicate<Target> universe, - Callback<Target> callback, - int depth, - int batchSize) { - this.minDepthUniquifier = minDepthUniquifier; - this.universe = universe; - this.callback = callback; - this.depth = depth; - this.batchSize = batchSize; - } - - @Override - public void process(Iterable<Target> targets) throws QueryException, InterruptedException { - Iterable<Target> currentInUniverse = Iterables.filter(targets, universe); - ImmutableList<Target> uniqueTargets = - minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(currentInUniverse, 0); - callback.process(uniqueTargets); - - // Maintain a queue to allow tracking rdep relationships in BFS order. Rdeps are stored - // as 1:N SkyKey mappings instead of fully populated Targets to save memory. Targets - // have a reference to their entire Package, which is really memory expensive. - Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue = new LinkedList<>(); - reverseDepsQueue.addAll( - graph.getReverseDeps(makeTransitiveTraversalKeys(uniqueTargets)).entrySet()); - - // In each iteration, we populate a size-limited (no more than batchSize) number of - // SkyKey mappings to targets, and append the SkyKey rdeps mappings to the queue. Once - // processed by callback, the targets are dequeued and not referenced any more, making - // them available for garbage collection. - - for (int curDepth = 1; curDepth <= depth; curDepth++) { - // The mappings between nodes and their reverse deps must be preserved instead of the - // reverse deps alone. Later when deserializing dependent nodes using SkyKeys, we need to - // check if their allowed deps contain the dependencies. - Map<SkyKey, Iterable<SkyKey>> reverseDepsMap = Maps.newHashMap(); - int batch = 0; // Tracking the current total number of rdeps in reverseDepsMap. - int processed = 0; - // Save current size as when we are process nodes in the current level, new mappings (for - // the next level) are added to the queue. - int size = reverseDepsQueue.size(); - while (processed < size) { - // We always peek the first element in the queue without polling it, to determine if - // adding it to the pending list will break the limit of max size. If yes then we process - // and empty the pending list first, and poll the element in the next iteration. - Map.Entry<SkyKey, Iterable<SkyKey>> entry = reverseDepsQueue.peek(); - - // The value of the entry is either a CompactHashSet or ImmutableList, which can return - // the size in O(1) time. - int rdepsSize = Iterables.size(entry.getValue()); - if (rdepsSize == 0) { - reverseDepsQueue.poll(); - processed++; - continue; - } - - if ((rdepsSize + batch <= batchSize)) { - // If current size is less than batch size, dequeue the node, update the current - // batch size and map. - reverseDepsMap.put(entry.getKey(), entry.getValue()); - batch += rdepsSize; - reverseDepsQueue.poll(); - processed++; - } else { - if (batch == 0) { - // The (single) node has more rdeps than the limit, divide them up to process - // separately. - for (Iterable<SkyKey> subList : Iterables.partition(entry.getValue(), batchSize)) { - reverseDepsMap.put(entry.getKey(), subList); - processReverseDepsMap( - minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth); - } - - reverseDepsQueue.poll(); - processed++; - } else { - // There are some nodes in the pending process list. Process them first and come - // back to this node later (in next iteration). - processReverseDepsMap( - minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth); - batch = 0; - } - } - } - - if (!reverseDepsMap.isEmpty()) { - processReverseDepsMap( - minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth); - } - - // If the queue is empty after all nodes in the current level are processed, stop - // processing as there are no more reverse deps. - if (reverseDepsQueue.isEmpty()) { - break; - } - } - } - - /** - * Populates {@link Target}s from reverse dep mappings of {@link SkyKey}s, empties the pending - * list and add next level reverse dep mappings of {@link SkyKey}s to the queue. - */ - private void processReverseDepsMap( - MinDepthUniquifier<Target> minDepthUniquifier, - Map<SkyKey, Iterable<SkyKey>> reverseDepsMap, - Callback<Target> callback, - Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue, - int depth) - throws QueryException, InterruptedException { - Collection<Target> children = processRawReverseDeps(targetifyValues(reverseDepsMap)); - Iterable<Target> currentInUniverse = Iterables.filter(children, universe); - ImmutableList<Target> uniqueChildren = - minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(currentInUniverse, depth); - reverseDepsMap.clear(); - - if (!uniqueChildren.isEmpty()) { - callback.process(uniqueChildren); - reverseDepsQueue.addAll( - graph.getReverseDeps(makeTransitiveTraversalKeys(uniqueChildren)).entrySet()); - } - } + QueryExpression universe, + VariableContext<Target> context, + Callback<Target> callback) { + return transformAsync( + getUniverseDTCSkyKeyPredicateFuture(universe, context), + universePredicate -> ParallelSkyQueryUtils.getRdepsInUniverseBoundedParallel( + this, expression, depth, universePredicate, context, callback, packageSemaphore)); } /** @@ -1465,4 +1356,32 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> } } } + + /** Pair of a key and a depth, useful for driving usages of {@link MinDepthUniquifier}. */ + public static class KeyAtDepth { + public final SkyKey key; + public final int depth; + + public KeyAtDepth(SkyKey key, int depth) { + this.key = key; + this.depth = depth; + } + + @Override + public int hashCode() { + // N.B. - We deliberately use a garbage-free hashCode implementation (rather than e.g. + // Objects#hash). This method is very hot during large visitations done by + // ParallelSkyQueryUtils. + return 31 * key.hashCode() + Integer.hashCode(depth); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof KeyAtDepth)) { + return false; + } + KeyAtDepth other = (KeyAtDepth) obj; + return key.equals(other.key) && depth == other.depth; + } + } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java index 5a3b91f575..6292b7f969 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java @@ -13,7 +13,6 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; -import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; @@ -58,11 +57,27 @@ public class AllRdepsFunction implements QueryFunction { QueryExpression expression, List<Argument> args, Callback<T> callback) { - return evalRdeps(env, context, args, callback, Optional.<Predicate<T>>absent()); + boolean isDepthUnbounded = args.size() == 1; + int depth = isDepthUnbounded ? Integer.MAX_VALUE : args.get(1).getInteger(); + QueryExpression argumentExpression = args.get(0).getExpression(); + if (env instanceof StreamableQueryEnvironment) { + StreamableQueryEnvironment<T> streamableEnv = (StreamableQueryEnvironment<T>) env; + return isDepthUnbounded + ? streamableEnv.getAllRdepsUnboundedParallel(argumentExpression, context, callback) + : streamableEnv.getAllRdepsBoundedParallel(argumentExpression, depth, context, callback); + } else { + return eval( + env, + argumentExpression, + Predicates.<T>alwaysTrue(), + context, + callback, + depth); + } } - /** Evaluates rdeps query. */ - public static <T> QueryTaskFuture<Void> eval( + /** Common non-parallel implementation of depth-bounded allrdeps/deps. */ + static <T> QueryTaskFuture<Void> eval( final QueryEnvironment<T> env, QueryExpression expression, final Predicate<T> universe, @@ -100,25 +115,4 @@ public class AllRdepsFunction implements QueryFunction { } }); } - - static <T> QueryTaskFuture<Void> evalRdeps( - final QueryEnvironment<T> env, - VariableContext<T> context, - final List<Argument> args, - final Callback<T> callback, - Optional<Predicate<T>> universeMaybe) { - final int depth = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE; - final Predicate<T> universe = universeMaybe.isPresent() - ? universeMaybe.get() - : Predicates.<T>alwaysTrue(); - if (env instanceof StreamableQueryEnvironment<?>) { - StreamableQueryEnvironment<T> streamableEnv = ((StreamableQueryEnvironment<T>) env); - return depth == Integer.MAX_VALUE && !universeMaybe.isPresent() - ? streamableEnv.getAllRdepsUnboundedParallel(args.get(0).getExpression(), context, callback) - : streamableEnv.getAllRdeps( - args.get(0).getExpression(), universe, context, callback, depth); - } else { - return eval(env, args.get(0).getExpression(), universe, context, callback, depth); - } - } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java b/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java index c62bea5754..92edef59b4 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java @@ -23,9 +23,25 @@ import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; */ @ThreadSafe public interface MinDepthUniquifier<T> { + + /** + * Returns whether {@code newElement} hasn't been seen before at depth less than or equal to + * {@code depth} by {@link #uniqueAtDepthLessThanOrEqualTo(T, int)} or + * {@link #uniqueAtDepthLessThanOrEqualTo(Iterable, int)}. + * + * <p>Please note the difference between this method and + * {@link #uniqueAtDepthLessThanOrEqualTo(T, int)}! + * + * <p>This method is inherently racy wrt {@link #uniqueAtDepthLessThanOrEqualTo(T, int)} and + * {@link #uniqueAtDepthLessThanOrEqualTo(Iterable, int)}. Only use it if you know what you are + * doing. + */ + boolean uniqueAtDepthLessThanOrEqualToPure(T newElement, int depth); + /** - * Returns the subset of {@code newElements} that haven't been seen before at depths less than or - * equal to {@code depth} + * Returns whether {@code newElement} hasn't been seen before at depth less than or equal to + * {@code depth} by {@link #uniqueAtDepthLessThanOrEqualTo(T, int)} or + * {@link #uniqueAtDepthLessThanOrEqualTo(Iterable, int)}. * * <p> There's a natural benign check-then-act race in all concurrent uses of this interface. * Imagine we have an element e, two depths d1 and d2 (with d2 < d1), and two threads T1 and T2. @@ -33,6 +49,13 @@ public interface MinDepthUniquifier<T> { * But before T1 finishes processing e, T2 may think _it's_ about to be first one to process an * element at a depth less than or equal to than d2. T1's work is probably wasted. */ + boolean uniqueAtDepthLessThanOrEqualTo(T newElement, int depth); + + /** + * Batch version of {@link #uniqueAtDepthLessThanOrEqualTo(Object, int)}. + * + * <p>The same benign check-then-act race applies here too. + */ ImmutableList<T> uniqueAtDepthLessThanOrEqualTo(Iterable<T> newElements, int depth); } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java index 3c6714b943..f2a9939ff2 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java @@ -267,27 +267,41 @@ public final class QueryUtil { @Override public final ImmutableList<T> uniqueAtDepthLessThanOrEqualTo( Iterable<T> newElements, int depth) { - ImmutableList.Builder<T> result = ImmutableList.builder(); - for (T element : newElements) { - AtomicInteger newDepth = new AtomicInteger(depth); - AtomicInteger previousDepth = - alreadySeenAtDepth.putIfAbsent(extractor.extractKey(element), newDepth); - if (previousDepth != null) { + ImmutableList.Builder<T> resultBuilder = ImmutableList.builder(); + for (T newElement : newElements) { + if (uniqueAtDepthLessThanOrEqualTo(newElement, depth)) { + resultBuilder.add(newElement); + } + } + return resultBuilder.build(); + } + + @Override + public boolean uniqueAtDepthLessThanOrEqualTo(T newElement, int depth) { + AtomicInteger newDepth = new AtomicInteger(depth); + AtomicInteger previousDepth = + alreadySeenAtDepth.putIfAbsent(extractor.extractKey(newElement), newDepth); + if (previousDepth == null) { + return true; + } + if (depth < previousDepth.get()) { + synchronized (previousDepth) { if (depth < previousDepth.get()) { - synchronized (previousDepth) { - if (depth < previousDepth.get()) { - // We've seen the element before, but never at a depth this shallow. - previousDepth.set(depth); - result.add(element); - } - } + // We've seen the element before, but never at a depth this shallow. + previousDepth.set(depth); + return true; } - } else { - // We've never seen the element before. - result.add(element); } } - return result.build(); + return false; + } + + @Override + public boolean uniqueAtDepthLessThanOrEqualToPure(T newElement, int depth) { + AtomicInteger previousDepth = alreadySeenAtDepth.get(extractor.extractKey(newElement)); + return previousDepth != null + ? depth < previousDepth.get() + : true; } } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java index 3971474edf..e40960a843 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java @@ -14,7 +14,6 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; @@ -53,35 +52,48 @@ public final class RdepsFunction extends AllRdepsFunction { @Override public <T> QueryTaskFuture<Void> eval( - final QueryEnvironment<T> env, - final VariableContext<T> context, - final QueryExpression expression, - final List<Argument> args, - final Callback<T> callback) { + QueryEnvironment<T> env, + VariableContext<T> context, + QueryExpression expression, + List<Argument> args, + Callback<T> callback) { boolean isDepthUnbounded = args.size() == 2; - return (isDepthUnbounded && env instanceof StreamableQueryEnvironment) - ? ((StreamableQueryEnvironment<T>) env) - .getRdepsUnboundedInUniverseParallel(expression, context, args, callback) - : evalWithBoundedDepth(env, context, expression, args, callback); + int depth = isDepthUnbounded ? Integer.MAX_VALUE : args.get(2).getInteger(); + QueryExpression universeExpression = args.get(0).getExpression(); + QueryExpression argumentExpression = args.get(1).getExpression(); + if (env instanceof StreamableQueryEnvironment) { + StreamableQueryEnvironment<T> streamableEnv = (StreamableQueryEnvironment<T>) env; + return isDepthUnbounded + ? streamableEnv.getRdepsUnboundedParallel( + argumentExpression, universeExpression, context, callback) + : streamableEnv.getRdepsBoundedParallel( + argumentExpression, depth, universeExpression, context, callback); + } else { + return evalWithBoundedDepth( + env, expression, context, argumentExpression, depth, universeExpression, callback); + } } /** * Compute the transitive closure of the universe, then breadth-first search from the argument * towards the universe while staying within the transitive closure. */ - public static <T> QueryTaskFuture<Void> evalWithBoundedDepth( + private static <T> QueryTaskFuture<Void> evalWithBoundedDepth( QueryEnvironment<T> env, + QueryExpression rdepsFunctionExpressionForErrorMessages, VariableContext<T> context, - QueryExpression expression, - final List<Argument> args, + QueryExpression argumentExpression, + int depth, + QueryExpression universeExpression, Callback<T> callback) { QueryTaskFuture<ThreadSafeMutableSet<T>> universeValueFuture = - QueryUtil.evalAll(env, context, args.get(0).getExpression()); + QueryUtil.evalAll(env, context, universeExpression); Function<ThreadSafeMutableSet<T>, QueryTaskFuture<Void>> evalInUniverseAsyncFunction = universeValue -> { Predicate<T> universe; try { - env.buildTransitiveClosure(expression, universeValue, Integer.MAX_VALUE); + env.buildTransitiveClosure( + rdepsFunctionExpressionForErrorMessages, universeValue, Integer.MAX_VALUE); universe = Predicates.in(env.getTransitiveClosure(universeValue)); } catch (InterruptedException e) { return env.immediateCancelledFuture(); @@ -89,8 +101,8 @@ public final class RdepsFunction extends AllRdepsFunction { return env.immediateFailedFuture(e); } - return AllRdepsFunction.evalRdeps( - env, context, args.subList(1, args.size()), callback, Optional.of(universe)); + return AllRdepsFunction.eval( + env, argumentExpression, universe, context, callback, depth); }; return env.transformAsync(universeValueFuture, evalInUniverseAsyncFunction); diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java index b055d6ad0a..ef34742e58 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java @@ -13,38 +13,33 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; -import com.google.common.base.Predicate; -import java.util.List; - /** * The environment of a Blaze query which supports predefined streaming operations. * * @param <T> the node type of the dependency graph */ public interface StreamableQueryEnvironment<T> extends QueryEnvironment<T> { - - /** Retrieves and processes all reverse dependencies of given expression in a streaming manner. */ - QueryTaskFuture<Void> getAllRdeps( + QueryTaskFuture<Void> getAllRdepsBoundedParallel( QueryExpression expression, - Predicate<T> universe, + int depth, VariableContext<T> context, - Callback<T> callback, - int depth); + Callback<T> callback); - /** Similar to {@link #getAllRdeps} but finds all rdeps without a depth bound. */ QueryTaskFuture<Void> getAllRdepsUnboundedParallel( - QueryExpression expression, VariableContext<T> context, Callback<T> callback); + QueryExpression expression, + VariableContext<T> context, + Callback<T> callback); + + QueryTaskFuture<Void> getRdepsBoundedParallel( + QueryExpression expression, + int depth, + QueryExpression universe, + VariableContext<T> context, + Callback<T> callback); - /** - * Similar to {@link #getAllRdepsUnboundedParallel} but finds rdeps in a universe without a depth - * depth. - * - * @param expression a "rdeps" expression without depth, such as rdeps(u, x) - * @param args two-item list containing both universe 'u' and argument set 'x' in rdeps(u, x) - */ - QueryTaskFuture<Void> getRdepsUnboundedInUniverseParallel( + QueryTaskFuture<Void> getRdepsUnboundedParallel( QueryExpression expression, + QueryExpression universe, VariableContext<T> context, - List<Argument> args, Callback<T> callback); } |