aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java590
1 files changed, 489 insertions, 101 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index 585b39d695..e0a09851fa 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -16,21 +16,31 @@ package com.google.devtools.build.lib.query2;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Maps;
+import com.google.common.collect.MapMaker;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Streams;
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.collect.compacthashset.CompactHashSet;
import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.packages.Target;
import com.google.devtools.build.lib.query2.engine.Callback;
+import com.google.devtools.build.lib.query2.engine.MinDepthUniquifier;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ThreadSafeMutableSet;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
+import com.google.devtools.build.lib.query2.engine.QueryUtil;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback;
import com.google.devtools.build.lib.query2.engine.QueryUtil.UniquifierImpl;
import com.google.devtools.build.lib.query2.engine.Uniquifier;
import com.google.devtools.build.lib.query2.engine.VariableContext;
@@ -40,7 +50,8 @@ import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.skyframe.SkyKey;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -63,10 +74,6 @@ public class ParallelSkyQueryUtils {
private ParallelSkyQueryUtils() {
}
- /**
- * Specialized parallel variant of {@link SkyQueryEnvironment#getAllRdeps} that is appropriate
- * when there is no depth-bound.
- */
static QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
SkyQueryEnvironment env,
QueryExpression expression,
@@ -77,7 +84,95 @@ public class ParallelSkyQueryUtils {
expression,
context,
ParallelVisitor.createParallelVisitorCallback(
- new AllRdepsUnboundedVisitor.Factory(env, callback, packageSemaphore)));
+ new RdepsUnboundedVisitor.Factory(
+ env,
+ /*universe=*/ Predicates.alwaysTrue(),
+ callback,
+ packageSemaphore)));
+ }
+
+ static QueryTaskFuture<Void> getAllRdepsBoundedParallel(
+ SkyQueryEnvironment env,
+ QueryExpression expression,
+ int depth,
+ VariableContext<Target> context,
+ Callback<Target> callback,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+ return env.eval(
+ expression,
+ context,
+ ParallelVisitor.createParallelVisitorCallback(
+ new RdepsBoundedVisitor.Factory(
+ env,
+ depth,
+ /*universe=*/ Predicates.alwaysTrue(),
+ callback,
+ packageSemaphore)));
+ }
+
+ static QueryTaskFuture<Void> getRdepsInUniverseUnboundedParallel(
+ SkyQueryEnvironment env,
+ QueryExpression expression,
+ Predicate<SkyKey> universe,
+ VariableContext<Target> context,
+ Callback<Target> callback,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+ return env.eval(
+ expression,
+ context,
+ ParallelVisitor.createParallelVisitorCallback(
+ new RdepsUnboundedVisitor.Factory(env, universe, callback, packageSemaphore)));
+ }
+
+ static QueryTaskFuture<Predicate<SkyKey>> getDTCSkyKeyPredicateFuture(
+ SkyQueryEnvironment env,
+ QueryExpression expression,
+ VariableContext<Target> context,
+ int processResultsBatchSize,
+ int concurrencyLevel) {
+ QueryTaskFuture<ThreadSafeMutableSet<Target>> universeValueFuture =
+ QueryUtil.evalAll(env, context, expression);
+
+ Function<ThreadSafeMutableSet<Target>, QueryTaskFuture<Predicate<SkyKey>>>
+ getTransitiveClosureAsyncFunction =
+ universeValue -> {
+ ThreadSafeAggregateAllSkyKeysCallback aggregateAllCallback =
+ new ThreadSafeAggregateAllSkyKeysCallback(concurrencyLevel);
+ return env.executeAsync(
+ () -> {
+ Callback<Target> visitorCallback =
+ ParallelVisitor.createParallelVisitorCallback(
+ new TransitiveClosureVisitor.Factory(
+ env,
+ env.createSkyKeyUniquifier(),
+ processResultsBatchSize,
+ aggregateAllCallback));
+ visitorCallback.process(universeValue);
+ return Predicates.in(aggregateAllCallback.getResult());
+ });
+ };
+
+ return env.transformAsync(universeValueFuture, getTransitiveClosureAsyncFunction);
+ }
+
+ static QueryTaskFuture<Void> getRdepsInUniverseBoundedParallel(
+ SkyQueryEnvironment env,
+ QueryExpression expression,
+ int depth,
+ Predicate<SkyKey> universe,
+ VariableContext<Target> context,
+ Callback<Target> callback,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+ return env.eval(
+ expression,
+ context,
+ ParallelVisitor.createParallelVisitorCallback(
+ new RdepsBoundedVisitor.Factory(
+ env,
+ depth,
+ universe,
+ callback,
+ packageSemaphore)));
}
/** Specialized parallel variant of {@link SkyQueryEnvironment#getRBuildFiles}. */
@@ -191,8 +286,75 @@ public class ParallelSkyQueryUtils {
}
}
+ private abstract static class AbstractRdepsVisitor<T> extends ParallelVisitor<T, Target> {
+ private static final int PROCESS_RESULTS_BATCH_SIZE = SkyQueryEnvironment.BATCH_CALLBACK_SIZE;
+
+ protected final SkyQueryEnvironment env;
+ protected final MultisetSemaphore<PackageIdentifier> packageSemaphore;
+
+ protected AbstractRdepsVisitor(
+ SkyQueryEnvironment env,
+ Callback<Target> callback,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+ super(callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE);
+ this.env = env;
+ this.packageSemaphore = packageSemaphore;
+ }
+
+ @Override
+ protected void processPartialResults(
+ Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
+ throws QueryException, InterruptedException {
+ Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
+ env.makePackageKeyToTargetKeyMap(keysToUseForResult);
+ Set<PackageIdentifier> pkgIdsNeededForResult =
+ packageKeyToTargetKeyMap
+ .keySet()
+ .stream()
+ .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
+ .collect(toImmutableSet());
+ packageSemaphore.acquireAll(pkgIdsNeededForResult);
+ try {
+ callback.process(
+ env.makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap).values());
+ } finally {
+ packageSemaphore.releaseAll(pkgIdsNeededForResult);
+ }
+ }
+
+ protected abstract SkyKey getRdepOfVisit(T visit);
+
+ @Override
+ protected Iterable<Task> getVisitTasks(Collection<T> pendingVisits) {
+ // Group pending visitation by the package of the rdep, since we'll be targetfying the
+ // rdep during the visitation.
+ ListMultimap<PackageIdentifier, T> visitsByPackage = ArrayListMultimap.create();
+ for (T visit : pendingVisits) {
+ Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(getRdepOfVisit(visit));
+ if (label != null) {
+ visitsByPackage.put(label.getPackageIdentifier(), visit);
+ }
+ }
+
+ ImmutableList.Builder<Task> builder = ImmutableList.builder();
+
+ // A couple notes here:
+ // (i) ArrayListMultimap#values returns the values grouped by key, which is exactly what we
+ // want.
+ // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid
+ // accidentally retaining the entire ArrayListMultimap object.
+ for (Iterable<T> visitBatch :
+ Iterables.partition(ImmutableList.copyOf(visitsByPackage.values()), VISIT_BATCH_SIZE)) {
+ builder.add(new VisitTask(visitBatch));
+ }
+
+ return builder.build();
+ }
+ }
+
/**
- * A helper class that computes 'allrdeps(<blah>)' via BFS.
+ * A helper class that computes unbounded 'allrdeps(<expr>)' or
+ * 'rdeps(<precomputed-universe>, <expr>)' via BFS.
*
* <p>The visitor uses {@link DepAndRdep} to keep track the nodes to visit and avoid dealing with
* targetification of reverse deps until they are needed. The rdep node itself is needed to filter
@@ -202,10 +364,7 @@ public class ParallelSkyQueryUtils {
* risk of OOMs. The additional memory usage should not be a large concern here, as even with 10M
* edges, the memory overhead is around 160M, and the memory can be reclaimed by regular GC.
*/
- private static class AllRdepsUnboundedVisitor extends ParallelVisitor<DepAndRdep, Target> {
- private static final int PROCESS_RESULTS_BATCH_SIZE = SkyQueryEnvironment.BATCH_CALLBACK_SIZE;
- private final SkyQueryEnvironment env;
- private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
+ private static class RdepsUnboundedVisitor extends AbstractRdepsVisitor<DepAndRdep> {
/**
* A {@link Uniquifier} for visitations. Solely used for {@link #getUniqueValues}, which
* actually isn't that useful. See the method javadoc.
@@ -215,25 +374,26 @@ public class ParallelSkyQueryUtils {
* A {@link Uniquifier} for *valid* visitations of rdeps. {@code env}'s dependency filter might
* mean that some rdep edges are invalid, meaning that any individual {@link DepAndRdep}
* visitation may actually be invalid. Because the same rdep can be reached through more than
- * one reverse edge, It'd be incorrectly to naively dedupe visitations solely based on the rdep.
+ * one reverse edge, it'd be incorrect to naively dedupe visitations solely based on the rdep.
*/
private final Uniquifier<SkyKey> validRdepUniquifier;
+ private final Predicate<SkyKey> universe;
- private AllRdepsUnboundedVisitor(
+ private RdepsUnboundedVisitor(
SkyQueryEnvironment env,
Uniquifier<DepAndRdep> depAndRdepUniquifier,
Uniquifier<SkyKey> validRdepUniquifier,
+ Predicate<SkyKey> universe,
Callback<Target> callback,
MultisetSemaphore<PackageIdentifier> packageSemaphore) {
- super(callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE);
- this.env = env;
+ super(env, callback, packageSemaphore);
this.depAndRdepUniquifier = depAndRdepUniquifier;
this.validRdepUniquifier = validRdepUniquifier;
- this.packageSemaphore = packageSemaphore;
+ this.universe = universe;
}
/**
- * A {@link Factory} for {@link AllRdepsUnboundedVisitor} instances, each of which will be used
+ * A {@link Factory} for {@link RdepsUnboundedVisitor} instances, each of which will be used
* to perform visitation of the reverse transitive closure of the {@link Target}s passed in a
* single {@link Callback#process} call. Note that all the created instances share the same
* {@link Uniquifier} so that we don't visit the same Skyframe node more than once.
@@ -242,14 +402,17 @@ public class ParallelSkyQueryUtils {
private final SkyQueryEnvironment env;
private final Uniquifier<DepAndRdep> depAndRdepUniquifier;
private final Uniquifier<SkyKey> validRdepUniquifier;
+ private final Predicate<SkyKey> universe;
private final Callback<Target> callback;
private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
private Factory(
SkyQueryEnvironment env,
+ Predicate<SkyKey> universe,
Callback<Target> callback,
MultisetSemaphore<PackageIdentifier> packageSemaphore) {
this.env = env;
+ this.universe = universe;
this.depAndRdepUniquifier = new UniquifierImpl<>(depAndRdep -> depAndRdep);
this.validRdepUniquifier = env.createSkyKeyUniquifier();
this.callback = callback;
@@ -258,31 +421,29 @@ public class ParallelSkyQueryUtils {
@Override
public ParallelVisitor<DepAndRdep, Target> create() {
- return new AllRdepsUnboundedVisitor(
- env, depAndRdepUniquifier, validRdepUniquifier, callback, packageSemaphore);
+ return new RdepsUnboundedVisitor(
+ env, depAndRdepUniquifier, validRdepUniquifier, universe, callback, packageSemaphore);
}
}
@Override
protected Visit getVisitResult(Iterable<DepAndRdep> depAndRdeps) throws InterruptedException {
- Collection<SkyKey> filteredUniqueKeys = new ArrayList<>();
+ Collection<SkyKey> validRdeps = new ArrayList<>();
- // Build a raw reverse dep map from pairs of SkyKeys to filter out the disallowed deps.
- Map<SkyKey, Collection<SkyKey>> reverseDepsMap = Maps.newHashMap();
+ // Multimap of dep to all the reverse deps in this visitation. Used to filter out the
+ // disallowed deps.
+ Multimap<SkyKey, SkyKey> reverseDepMultimap = ArrayListMultimap.create();
for (DepAndRdep depAndRdep : depAndRdeps) {
// The "roots" of our visitation (see #preprocessInitialVisit) have a null 'dep' field.
- if (depAndRdep.dep == null && validRdepUniquifier.unique(depAndRdep.rdep)) {
- filteredUniqueKeys.add(depAndRdep.rdep);
- continue;
+ if (depAndRdep.dep == null) {
+ validRdeps.add(depAndRdep.rdep);
+ } else {
+ reverseDepMultimap.put(depAndRdep.dep, depAndRdep.rdep);
}
-
- reverseDepsMap.computeIfAbsent(depAndRdep.dep, k -> new LinkedList<SkyKey>());
-
- reverseDepsMap.get(depAndRdep.dep).add(depAndRdep.rdep);
}
Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
- env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepsMap.values()));
+ env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepMultimap.values()));
Set<PackageIdentifier> pkgIdsNeededForTargetification =
packageKeyToTargetKeyMap
.keySet()
@@ -294,95 +455,49 @@ public class ParallelSkyQueryUtils {
try {
// Filter out disallowed deps. We cannot defer the targetification any further as we do not
// want to retrieve the rdeps of unwanted nodes (targets).
- if (!reverseDepsMap.isEmpty()) {
+ if (!reverseDepMultimap.isEmpty()) {
Collection<Target> filteredTargets =
env.filterRawReverseDepsOfTransitiveTraversalKeys(
- reverseDepsMap, packageKeyToTargetKeyMap);
+ reverseDepMultimap.asMap(), packageKeyToTargetKeyMap);
filteredTargets
.stream()
.map(SkyQueryEnvironment.TARGET_TO_SKY_KEY)
- .forEachOrdered(
- rdep -> {
- if (validRdepUniquifier.unique(rdep)) {
- filteredUniqueKeys.add(rdep);
- }
- });
- }
+ .forEachOrdered(validRdeps::add);
+ }
} finally {
packageSemaphore.releaseAll(pkgIdsNeededForTargetification);
}
+ ImmutableList<SkyKey> uniqueValidRdeps = validRdeps.stream()
+ .filter(validRdepUniquifier::unique)
+ .collect(ImmutableList.toImmutableList());
+
// Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next
// recursive visitation.
- Map<SkyKey, Iterable<SkyKey>> unfilteredReverseDeps =
- env.graph.getReverseDeps(filteredUniqueKeys);
-
- ImmutableList.Builder<DepAndRdep> builder = ImmutableList.builder();
- for (Map.Entry<SkyKey, Iterable<SkyKey>> rdeps : unfilteredReverseDeps.entrySet()) {
- for (SkyKey rdep : rdeps.getValue()) {
- Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(rdep);
- if (label != null) {
- builder.add(new DepAndRdep(rdeps.getKey(), rdep));
- }
- }
- }
+ ImmutableList.Builder<DepAndRdep> depAndRdepsToVisitBuilder = ImmutableList.builder();
+ env.graph.getReverseDeps(uniqueValidRdeps).entrySet()
+ .forEach(reverseDepsEntry -> depAndRdepsToVisitBuilder.addAll(
+ Iterables.transform(
+ Iterables.filter(
+ reverseDepsEntry.getValue(),
+ Predicates.and(SkyQueryEnvironment.IS_TTV, universe)),
+ rdep -> new DepAndRdep(reverseDepsEntry.getKey(), rdep))));
return new Visit(
- /*keysToUseForResult=*/ filteredUniqueKeys, /*keysToVisit=*/ builder.build());
- }
-
- @Override
- protected void processPartialResults(
- Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
- throws QueryException, InterruptedException {
- Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
- env.makePackageKeyToTargetKeyMap(keysToUseForResult);
- Set<PackageIdentifier> pkgIdsNeededForResult =
- packageKeyToTargetKeyMap
- .keySet()
- .stream()
- .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
- .collect(toImmutableSet());
- packageSemaphore.acquireAll(pkgIdsNeededForResult);
- try {
- callback.process(
- env.makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap).values());
- } finally {
- packageSemaphore.releaseAll(pkgIdsNeededForResult);
- }
+ /*keysToUseForResult=*/ uniqueValidRdeps,
+ /*keysToVisit=*/ depAndRdepsToVisitBuilder.build());
}
@Override
protected Iterable<DepAndRdep> preprocessInitialVisit(Iterable<SkyKey> keys) {
- return Iterables.transform(keys, key -> new DepAndRdep(null, key));
+ return Iterables.transform(
+ Iterables.filter(keys, k -> universe.apply(k)),
+ key -> new DepAndRdep(null, key));
}
@Override
- protected Iterable<Task> getVisitTasks(Collection<DepAndRdep> pendingKeysToVisit) {
- // Group pending (dep, rdep) visits by the package of the rdep, since we'll be targetfying the
- // rdep during the visitation.
- ListMultimap<PackageIdentifier, DepAndRdep> visitsByPackage =
- ArrayListMultimap.create();
- for (DepAndRdep depAndRdep : pendingKeysToVisit) {
- Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(depAndRdep.rdep);
- if (label != null) {
- visitsByPackage.put(label.getPackageIdentifier(), depAndRdep);
- }
- }
-
- ImmutableList.Builder<Task> builder = ImmutableList.builder();
-
- // A couple notes here:
- // (i) ArrayListMultimap#values returns the values grouped by key, which is exactly what we
- // want.
- // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid
- // accidentally retaining the entire ArrayListMultimap object.
- for (Iterable<DepAndRdep> depAndRdepBatch :
- Iterables.partition(ImmutableList.copyOf(visitsByPackage.values()), VISIT_BATCH_SIZE)) {
- builder.add(new VisitTask(depAndRdepBatch));
- }
-
- return builder.build();
+ protected SkyKey getRdepOfVisit(DepAndRdep visit) {
+ return visit.rdep;
}
@Override
@@ -396,7 +511,7 @@ public class ParallelSkyQueryUtils {
// Still, we include an implementation of 'getUniqueValues' that is correct in isolation so as
// to not be depending on implementation details of 'ParallelVisitor'.
//
- // Even so, there's value in not visiting a rdep if it's already been visiting *validly*
+ // Even so, there's value in not visiting a rdep if it's already been visited *validly*
// before. We use the intentionally racy {@link Uniquifier#uniquePure} to attempt to do this.
return depAndRdepUniquifier.unique(
Iterables.filter(
@@ -404,5 +519,278 @@ public class ParallelSkyQueryUtils {
depAndRdep -> validRdepUniquifier.uniquePure(depAndRdep.rdep)));
}
}
+
+ private static class DepAndRdepAtDepth {
+ private final DepAndRdep depAndRdep;
+ private final int rdepDepth;
+
+ private DepAndRdepAtDepth(DepAndRdep depAndRdep, int rdepDepth) {
+ this.depAndRdep = depAndRdep;
+ this.rdepDepth = rdepDepth;
+ }
+ }
+
+ /**
+ * A helper class that computes bounded 'allrdeps(<expr>, <depth>)' or
+ * 'rdeps(<precomputed-universe>, <expr>, <depth>)' via BFS.
+ *
+ * <p>This is very similar to {@link RdepsUnboundedVisitor}. A lot of the same concerns apply here
+ * but there are additional subtle concerns about the correctness of the bounded traversal: just
+ * like for the sequential implementation of bounded allrdeps, we use {@link MinDepthUniquifier}.
+ */
+ private static class RdepsBoundedVisitor extends AbstractRdepsVisitor<DepAndRdepAtDepth> {
+ private final int depth;
+ private final Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier;
+ private final MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier;
+ private final Predicate<SkyKey> universe;
+
+ private RdepsBoundedVisitor(
+ SkyQueryEnvironment env,
+ int depth,
+ Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier,
+ MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier,
+ Predicate<SkyKey> universe,
+ Callback<Target> callback,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+ super(env, callback, packageSemaphore);
+ this.depth = depth;
+ this.depAndRdepAtDepthUniquifier = depAndRdepAtDepthUniquifier;
+ this.validRdepMinDepthUniquifier = validRdepMinDepthUniquifier;
+ this.universe = universe;
+ }
+
+ private static class Factory implements ParallelVisitor.Factory {
+ private final SkyQueryEnvironment env;
+ private final int depth;
+ private final Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier;
+ private final MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier;
+ private final Predicate<SkyKey> universe;
+ private final Callback<Target> callback;
+ private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
+
+ private Factory(
+ SkyQueryEnvironment env,
+ int depth,
+ Predicate<SkyKey> universe,
+ Callback<Target> callback,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+ this.env = env;
+ this.depth = depth;
+ this.universe = universe;
+ this.depAndRdepAtDepthUniquifier =
+ new UniquifierImpl<>(depAndRdepAtDepth -> depAndRdepAtDepth);
+ this.validRdepMinDepthUniquifier = env.createMinDepthSkyKeyUniquifier();
+ this.callback = callback;
+ this.packageSemaphore = packageSemaphore;
+ }
+
+ @Override
+ public ParallelVisitor<DepAndRdepAtDepth, Target> create() {
+ return new RdepsBoundedVisitor(
+ env,
+ depth,
+ depAndRdepAtDepthUniquifier,
+ validRdepMinDepthUniquifier,
+ universe,
+ callback,
+ packageSemaphore);
+ }
+ }
+
+ @Override
+ protected Visit getVisitResult(Iterable<DepAndRdepAtDepth> depAndRdepAtDepths)
+ throws InterruptedException {
+ Map<SkyKey, Integer> shallowestRdepDepthMap = new HashMap<>();
+ depAndRdepAtDepths.forEach(
+ depAndRdepAtDepth -> shallowestRdepDepthMap.merge(
+ depAndRdepAtDepth.depAndRdep.rdep, depAndRdepAtDepth.rdepDepth, Integer::min));
+
+ Collection<SkyKey> validRdeps = new ArrayList<>();
+
+ // Multimap of dep to all the reverse deps in this visitation. Used to filter out the
+ // disallowed deps.
+ Multimap<SkyKey, SkyKey> reverseDepMultimap = ArrayListMultimap.create();
+ for (DepAndRdepAtDepth depAndRdepAtDepth : depAndRdepAtDepths) {
+ // The "roots" of our visitation (see #preprocessInitialVisit) have a null 'dep' field.
+ if (depAndRdepAtDepth.depAndRdep.dep == null) {
+ validRdeps.add(depAndRdepAtDepth.depAndRdep.rdep);
+ } else {
+ reverseDepMultimap.put(
+ depAndRdepAtDepth.depAndRdep.dep, depAndRdepAtDepth.depAndRdep.rdep);
+ }
+ }
+
+ Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
+ env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepMultimap.values()));
+ Set<PackageIdentifier> pkgIdsNeededForTargetification =
+ packageKeyToTargetKeyMap
+ .keySet()
+ .stream()
+ .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
+ .collect(toImmutableSet());
+ packageSemaphore.acquireAll(pkgIdsNeededForTargetification);
+
+ try {
+ // Filter out disallowed deps. We cannot defer the targetification any further as we do not
+ // want to retrieve the rdeps of unwanted nodes (targets).
+ if (!reverseDepMultimap.isEmpty()) {
+ Collection<Target> filteredTargets =
+ env.filterRawReverseDepsOfTransitiveTraversalKeys(
+ reverseDepMultimap.asMap(), packageKeyToTargetKeyMap);
+ filteredTargets
+ .stream()
+ .map(SkyQueryEnvironment.TARGET_TO_SKY_KEY)
+ .forEachOrdered(validRdeps::add);
+ }
+ } finally {
+ packageSemaphore.releaseAll(pkgIdsNeededForTargetification);
+ }
+
+ ImmutableList<SkyKey> uniqueValidRdeps = validRdeps.stream()
+ .filter(validRdep -> validRdepMinDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(
+ validRdep, shallowestRdepDepthMap.get(validRdep)))
+ .collect(ImmutableList.toImmutableList());
+
+ // Don't bother getting the rdeps of the rdeps that are already at the depth bound.
+ Iterable<SkyKey> uniqueValidRdepsBelowDepthBound = Iterables.filter(
+ uniqueValidRdeps,
+ uniqueValidRdep -> shallowestRdepDepthMap.get(uniqueValidRdep) < depth);
+
+ // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next
+ // recursive visitation.
+ Map<SkyKey, Iterable<SkyKey>> unfilteredRdepsOfRdeps =
+ env.graph.getReverseDeps(uniqueValidRdepsBelowDepthBound);
+
+ ImmutableList.Builder<DepAndRdepAtDepth> depAndRdepAtDepthsToVisitBuilder =
+ ImmutableList.builder();
+ unfilteredRdepsOfRdeps.entrySet().forEach(entry -> {
+ SkyKey rdep = entry.getKey();
+ int depthOfRdepOfRdep = shallowestRdepDepthMap.get(rdep) + 1;
+ Streams.stream(entry.getValue())
+ .filter(Predicates.and(SkyQueryEnvironment.IS_TTV, universe))
+ .forEachOrdered(rdepOfRdep -> {
+ depAndRdepAtDepthsToVisitBuilder.add(
+ new DepAndRdepAtDepth(new DepAndRdep(rdep, rdepOfRdep), depthOfRdepOfRdep));
+ });
+ });
+
+ return new Visit(
+ /*keysToUseForResult=*/ uniqueValidRdeps,
+ /*keysToVisit=*/ depAndRdepAtDepthsToVisitBuilder.build());
+ }
+
+ @Override
+ protected Iterable<DepAndRdepAtDepth> preprocessInitialVisit(Iterable<SkyKey> keys) {
+ return Iterables.transform(
+ Iterables.filter(keys, k -> universe.apply(k)),
+ key -> new DepAndRdepAtDepth(new DepAndRdep(null, key), 0));
+ }
+
+ @Override
+ protected SkyKey getRdepOfVisit(DepAndRdepAtDepth visit) {
+ return visit.depAndRdep.rdep;
+ }
+
+ @Override
+ protected ImmutableList<DepAndRdepAtDepth> getUniqueValues(
+ Iterable<DepAndRdepAtDepth> depAndRdepAtDepths) {
+ // See the comment in RdepsUnboundedVisitor#getUniqueValues.
+ return depAndRdepAtDepthUniquifier.unique(
+ Iterables.filter(
+ depAndRdepAtDepths,
+ depAndRdepAtDepth -> validRdepMinDepthUniquifier.uniqueAtDepthLessThanOrEqualToPure(
+ depAndRdepAtDepth.depAndRdep.rdep, depAndRdepAtDepth.rdepDepth)));
+ }
+ }
+
+ /** Helper class that computes DTC in the form of {@link SkyKey} via BFS. */
+ // TODO(nharmata): This should only be for the TTV-land DTC (i.e. only follow TTV -> TTV edges).
+ private static class TransitiveClosureVisitor extends ParallelVisitor<SkyKey, SkyKey> {
+ private final SkyQueryEnvironment env;
+ private final Uniquifier<SkyKey> uniquifier;
+
+ private TransitiveClosureVisitor(
+ SkyQueryEnvironment env,
+ Uniquifier<SkyKey> uniquifier,
+ int processResultsBatchSize,
+ AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback) {
+ super(aggregateAllCallback, VISIT_BATCH_SIZE, processResultsBatchSize);
+ this.env = env;
+ this.uniquifier = uniquifier;
+ }
+
+ private static class Factory implements ParallelVisitor.Factory {
+ private final SkyQueryEnvironment env;
+ private final Uniquifier<SkyKey> uniquifier;
+ private final AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback;
+ private final int processResultsBatchSize;
+
+ private Factory(
+ SkyQueryEnvironment env,
+ Uniquifier<SkyKey> uniquifier,
+ int processResultsBatchSize,
+ AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback) {
+ this.env = env;
+ this.uniquifier = uniquifier;
+ this.processResultsBatchSize = processResultsBatchSize;
+ this.aggregateAllCallback = aggregateAllCallback;
+ }
+
+ @Override
+ public ParallelVisitor<SkyKey, SkyKey> create() {
+ return new TransitiveClosureVisitor(
+ env, uniquifier, processResultsBatchSize, aggregateAllCallback);
+ }
+ }
+
+ @Override
+ protected void processPartialResults(
+ Iterable<SkyKey> keysToUseForResult, Callback<SkyKey> callback)
+ throws QueryException, InterruptedException {
+ callback.process(keysToUseForResult);
+ }
+
+ @Override
+ protected Visit getVisitResult(Iterable<SkyKey> values) throws InterruptedException {
+ Multimap<SkyKey, SkyKey> deps = env.getDirectDepsOfSkyKeys(values);
+ return new Visit(
+ /*keysToUseForResult=*/ deps.keySet(),
+ /*keysToVisit=*/ ImmutableSet.copyOf(deps.values()));
+ }
+
+ @Override
+ protected Iterable<SkyKey> preprocessInitialVisit(Iterable<SkyKey> keys) {
+ return keys;
+ }
+
+ @Override
+ protected ImmutableList<SkyKey> getUniqueValues(Iterable<SkyKey> values) {
+ return uniquifier.unique(values);
+ }
+ }
+
+ /** Thread-safe {@link AggregateAllCallback} backed by a concurrent {@link Set}. */
+ @ThreadSafe
+ private static class ThreadSafeAggregateAllSkyKeysCallback
+ implements AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> {
+
+ private final Set<SkyKey> results;
+
+ private ThreadSafeAggregateAllSkyKeysCallback(int concurrencyLevel) {
+ this.results =
+ Collections.newSetFromMap(new MapMaker().concurrencyLevel(concurrencyLevel).makeMap());
+ }
+
+ @Override
+ public void process(Iterable<SkyKey> partialResult)
+ throws QueryException, InterruptedException {
+ Iterables.addAll(results, partialResult);
+ }
+
+ @Override
+ public ImmutableSet<SkyKey> getResult() {
+ return ImmutableSet.copyOf(results);
+ }
+ }
}