aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/query2
diff options
context:
space:
mode:
authorGravatar nharmata <nharmata@google.com>2018-04-12 15:31:26 -0700
committerGravatar Copybara-Service <copybara-piper@google.com>2018-04-12 15:33:05 -0700
commit398e6dab1092740e38a4ff8657a2f8dee9ee7c20 (patch)
tree09da67203e0d378c015f785958c5c9f7f7dfe813 /src/main/java/com/google/devtools/build/lib/query2
parent06cb84dcf9a0decdd8ff74464137ccd48eb9646e (diff)
Provide parallel implementations of bounded allrdeps and rdeps.
RELNOTES: None PiperOrigin-RevId: 192681579
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java590
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java4
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java253
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java44
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java27
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java48
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java46
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java35
8 files changed, 695 insertions, 352 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index 585b39d695..e0a09851fa 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -16,21 +16,31 @@ package com.google.devtools.build.lib.query2;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Maps;
+import com.google.common.collect.MapMaker;
import com.google.common.collect.Multimap;
+import com.google.common.collect.Streams;
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.collect.compacthashset.CompactHashSet;
import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.packages.Target;
import com.google.devtools.build.lib.query2.engine.Callback;
+import com.google.devtools.build.lib.query2.engine.MinDepthUniquifier;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ThreadSafeMutableSet;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
+import com.google.devtools.build.lib.query2.engine.QueryUtil;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback;
import com.google.devtools.build.lib.query2.engine.QueryUtil.UniquifierImpl;
import com.google.devtools.build.lib.query2.engine.Uniquifier;
import com.google.devtools.build.lib.query2.engine.VariableContext;
@@ -40,7 +50,8 @@ import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.skyframe.SkyKey;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -63,10 +74,6 @@ public class ParallelSkyQueryUtils {
private ParallelSkyQueryUtils() {
}
- /**
- * Specialized parallel variant of {@link SkyQueryEnvironment#getAllRdeps} that is appropriate
- * when there is no depth-bound.
- */
static QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
SkyQueryEnvironment env,
QueryExpression expression,
@@ -77,7 +84,95 @@ public class ParallelSkyQueryUtils {
expression,
context,
ParallelVisitor.createParallelVisitorCallback(
- new AllRdepsUnboundedVisitor.Factory(env, callback, packageSemaphore)));
+ new RdepsUnboundedVisitor.Factory(
+ env,
+ /*universe=*/ Predicates.alwaysTrue(),
+ callback,
+ packageSemaphore)));
+ }
+
+ static QueryTaskFuture<Void> getAllRdepsBoundedParallel(
+ SkyQueryEnvironment env,
+ QueryExpression expression,
+ int depth,
+ VariableContext<Target> context,
+ Callback<Target> callback,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+ return env.eval(
+ expression,
+ context,
+ ParallelVisitor.createParallelVisitorCallback(
+ new RdepsBoundedVisitor.Factory(
+ env,
+ depth,
+ /*universe=*/ Predicates.alwaysTrue(),
+ callback,
+ packageSemaphore)));
+ }
+
+ static QueryTaskFuture<Void> getRdepsInUniverseUnboundedParallel(
+ SkyQueryEnvironment env,
+ QueryExpression expression,
+ Predicate<SkyKey> universe,
+ VariableContext<Target> context,
+ Callback<Target> callback,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+ return env.eval(
+ expression,
+ context,
+ ParallelVisitor.createParallelVisitorCallback(
+ new RdepsUnboundedVisitor.Factory(env, universe, callback, packageSemaphore)));
+ }
+
+ static QueryTaskFuture<Predicate<SkyKey>> getDTCSkyKeyPredicateFuture(
+ SkyQueryEnvironment env,
+ QueryExpression expression,
+ VariableContext<Target> context,
+ int processResultsBatchSize,
+ int concurrencyLevel) {
+ QueryTaskFuture<ThreadSafeMutableSet<Target>> universeValueFuture =
+ QueryUtil.evalAll(env, context, expression);
+
+ Function<ThreadSafeMutableSet<Target>, QueryTaskFuture<Predicate<SkyKey>>>
+ getTransitiveClosureAsyncFunction =
+ universeValue -> {
+ ThreadSafeAggregateAllSkyKeysCallback aggregateAllCallback =
+ new ThreadSafeAggregateAllSkyKeysCallback(concurrencyLevel);
+ return env.executeAsync(
+ () -> {
+ Callback<Target> visitorCallback =
+ ParallelVisitor.createParallelVisitorCallback(
+ new TransitiveClosureVisitor.Factory(
+ env,
+ env.createSkyKeyUniquifier(),
+ processResultsBatchSize,
+ aggregateAllCallback));
+ visitorCallback.process(universeValue);
+ return Predicates.in(aggregateAllCallback.getResult());
+ });
+ };
+
+ return env.transformAsync(universeValueFuture, getTransitiveClosureAsyncFunction);
+ }
+
+ static QueryTaskFuture<Void> getRdepsInUniverseBoundedParallel(
+ SkyQueryEnvironment env,
+ QueryExpression expression,
+ int depth,
+ Predicate<SkyKey> universe,
+ VariableContext<Target> context,
+ Callback<Target> callback,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+ return env.eval(
+ expression,
+ context,
+ ParallelVisitor.createParallelVisitorCallback(
+ new RdepsBoundedVisitor.Factory(
+ env,
+ depth,
+ universe,
+ callback,
+ packageSemaphore)));
}
/** Specialized parallel variant of {@link SkyQueryEnvironment#getRBuildFiles}. */
@@ -191,8 +286,75 @@ public class ParallelSkyQueryUtils {
}
}
+ private abstract static class AbstractRdepsVisitor<T> extends ParallelVisitor<T, Target> {
+ private static final int PROCESS_RESULTS_BATCH_SIZE = SkyQueryEnvironment.BATCH_CALLBACK_SIZE;
+
+ protected final SkyQueryEnvironment env;
+ protected final MultisetSemaphore<PackageIdentifier> packageSemaphore;
+
+ protected AbstractRdepsVisitor(
+ SkyQueryEnvironment env,
+ Callback<Target> callback,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+ super(callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE);
+ this.env = env;
+ this.packageSemaphore = packageSemaphore;
+ }
+
+ @Override
+ protected void processPartialResults(
+ Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
+ throws QueryException, InterruptedException {
+ Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
+ env.makePackageKeyToTargetKeyMap(keysToUseForResult);
+ Set<PackageIdentifier> pkgIdsNeededForResult =
+ packageKeyToTargetKeyMap
+ .keySet()
+ .stream()
+ .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
+ .collect(toImmutableSet());
+ packageSemaphore.acquireAll(pkgIdsNeededForResult);
+ try {
+ callback.process(
+ env.makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap).values());
+ } finally {
+ packageSemaphore.releaseAll(pkgIdsNeededForResult);
+ }
+ }
+
+ protected abstract SkyKey getRdepOfVisit(T visit);
+
+ @Override
+ protected Iterable<Task> getVisitTasks(Collection<T> pendingVisits) {
+ // Group pending visitation by the package of the rdep, since we'll be targetfying the
+ // rdep during the visitation.
+ ListMultimap<PackageIdentifier, T> visitsByPackage = ArrayListMultimap.create();
+ for (T visit : pendingVisits) {
+ Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(getRdepOfVisit(visit));
+ if (label != null) {
+ visitsByPackage.put(label.getPackageIdentifier(), visit);
+ }
+ }
+
+ ImmutableList.Builder<Task> builder = ImmutableList.builder();
+
+ // A couple notes here:
+ // (i) ArrayListMultimap#values returns the values grouped by key, which is exactly what we
+ // want.
+ // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid
+ // accidentally retaining the entire ArrayListMultimap object.
+ for (Iterable<T> visitBatch :
+ Iterables.partition(ImmutableList.copyOf(visitsByPackage.values()), VISIT_BATCH_SIZE)) {
+ builder.add(new VisitTask(visitBatch));
+ }
+
+ return builder.build();
+ }
+ }
+
/**
- * A helper class that computes 'allrdeps(<blah>)' via BFS.
+ * A helper class that computes unbounded 'allrdeps(<expr>)' or
+ * 'rdeps(<precomputed-universe>, <expr>)' via BFS.
*
* <p>The visitor uses {@link DepAndRdep} to keep track the nodes to visit and avoid dealing with
* targetification of reverse deps until they are needed. The rdep node itself is needed to filter
@@ -202,10 +364,7 @@ public class ParallelSkyQueryUtils {
* risk of OOMs. The additional memory usage should not be a large concern here, as even with 10M
* edges, the memory overhead is around 160M, and the memory can be reclaimed by regular GC.
*/
- private static class AllRdepsUnboundedVisitor extends ParallelVisitor<DepAndRdep, Target> {
- private static final int PROCESS_RESULTS_BATCH_SIZE = SkyQueryEnvironment.BATCH_CALLBACK_SIZE;
- private final SkyQueryEnvironment env;
- private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
+ private static class RdepsUnboundedVisitor extends AbstractRdepsVisitor<DepAndRdep> {
/**
* A {@link Uniquifier} for visitations. Solely used for {@link #getUniqueValues}, which
* actually isn't that useful. See the method javadoc.
@@ -215,25 +374,26 @@ public class ParallelSkyQueryUtils {
* A {@link Uniquifier} for *valid* visitations of rdeps. {@code env}'s dependency filter might
* mean that some rdep edges are invalid, meaning that any individual {@link DepAndRdep}
* visitation may actually be invalid. Because the same rdep can be reached through more than
- * one reverse edge, It'd be incorrectly to naively dedupe visitations solely based on the rdep.
+ * one reverse edge, it'd be incorrect to naively dedupe visitations solely based on the rdep.
*/
private final Uniquifier<SkyKey> validRdepUniquifier;
+ private final Predicate<SkyKey> universe;
- private AllRdepsUnboundedVisitor(
+ private RdepsUnboundedVisitor(
SkyQueryEnvironment env,
Uniquifier<DepAndRdep> depAndRdepUniquifier,
Uniquifier<SkyKey> validRdepUniquifier,
+ Predicate<SkyKey> universe,
Callback<Target> callback,
MultisetSemaphore<PackageIdentifier> packageSemaphore) {
- super(callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE);
- this.env = env;
+ super(env, callback, packageSemaphore);
this.depAndRdepUniquifier = depAndRdepUniquifier;
this.validRdepUniquifier = validRdepUniquifier;
- this.packageSemaphore = packageSemaphore;
+ this.universe = universe;
}
/**
- * A {@link Factory} for {@link AllRdepsUnboundedVisitor} instances, each of which will be used
+ * A {@link Factory} for {@link RdepsUnboundedVisitor} instances, each of which will be used
* to perform visitation of the reverse transitive closure of the {@link Target}s passed in a
* single {@link Callback#process} call. Note that all the created instances share the same
* {@link Uniquifier} so that we don't visit the same Skyframe node more than once.
@@ -242,14 +402,17 @@ public class ParallelSkyQueryUtils {
private final SkyQueryEnvironment env;
private final Uniquifier<DepAndRdep> depAndRdepUniquifier;
private final Uniquifier<SkyKey> validRdepUniquifier;
+ private final Predicate<SkyKey> universe;
private final Callback<Target> callback;
private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
private Factory(
SkyQueryEnvironment env,
+ Predicate<SkyKey> universe,
Callback<Target> callback,
MultisetSemaphore<PackageIdentifier> packageSemaphore) {
this.env = env;
+ this.universe = universe;
this.depAndRdepUniquifier = new UniquifierImpl<>(depAndRdep -> depAndRdep);
this.validRdepUniquifier = env.createSkyKeyUniquifier();
this.callback = callback;
@@ -258,31 +421,29 @@ public class ParallelSkyQueryUtils {
@Override
public ParallelVisitor<DepAndRdep, Target> create() {
- return new AllRdepsUnboundedVisitor(
- env, depAndRdepUniquifier, validRdepUniquifier, callback, packageSemaphore);
+ return new RdepsUnboundedVisitor(
+ env, depAndRdepUniquifier, validRdepUniquifier, universe, callback, packageSemaphore);
}
}
@Override
protected Visit getVisitResult(Iterable<DepAndRdep> depAndRdeps) throws InterruptedException {
- Collection<SkyKey> filteredUniqueKeys = new ArrayList<>();
+ Collection<SkyKey> validRdeps = new ArrayList<>();
- // Build a raw reverse dep map from pairs of SkyKeys to filter out the disallowed deps.
- Map<SkyKey, Collection<SkyKey>> reverseDepsMap = Maps.newHashMap();
+ // Multimap of dep to all the reverse deps in this visitation. Used to filter out the
+ // disallowed deps.
+ Multimap<SkyKey, SkyKey> reverseDepMultimap = ArrayListMultimap.create();
for (DepAndRdep depAndRdep : depAndRdeps) {
// The "roots" of our visitation (see #preprocessInitialVisit) have a null 'dep' field.
- if (depAndRdep.dep == null && validRdepUniquifier.unique(depAndRdep.rdep)) {
- filteredUniqueKeys.add(depAndRdep.rdep);
- continue;
+ if (depAndRdep.dep == null) {
+ validRdeps.add(depAndRdep.rdep);
+ } else {
+ reverseDepMultimap.put(depAndRdep.dep, depAndRdep.rdep);
}
-
- reverseDepsMap.computeIfAbsent(depAndRdep.dep, k -> new LinkedList<SkyKey>());
-
- reverseDepsMap.get(depAndRdep.dep).add(depAndRdep.rdep);
}
Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
- env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepsMap.values()));
+ env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepMultimap.values()));
Set<PackageIdentifier> pkgIdsNeededForTargetification =
packageKeyToTargetKeyMap
.keySet()
@@ -294,95 +455,49 @@ public class ParallelSkyQueryUtils {
try {
// Filter out disallowed deps. We cannot defer the targetification any further as we do not
// want to retrieve the rdeps of unwanted nodes (targets).
- if (!reverseDepsMap.isEmpty()) {
+ if (!reverseDepMultimap.isEmpty()) {
Collection<Target> filteredTargets =
env.filterRawReverseDepsOfTransitiveTraversalKeys(
- reverseDepsMap, packageKeyToTargetKeyMap);
+ reverseDepMultimap.asMap(), packageKeyToTargetKeyMap);
filteredTargets
.stream()
.map(SkyQueryEnvironment.TARGET_TO_SKY_KEY)
- .forEachOrdered(
- rdep -> {
- if (validRdepUniquifier.unique(rdep)) {
- filteredUniqueKeys.add(rdep);
- }
- });
- }
+ .forEachOrdered(validRdeps::add);
+ }
} finally {
packageSemaphore.releaseAll(pkgIdsNeededForTargetification);
}
+ ImmutableList<SkyKey> uniqueValidRdeps = validRdeps.stream()
+ .filter(validRdepUniquifier::unique)
+ .collect(ImmutableList.toImmutableList());
+
// Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next
// recursive visitation.
- Map<SkyKey, Iterable<SkyKey>> unfilteredReverseDeps =
- env.graph.getReverseDeps(filteredUniqueKeys);
-
- ImmutableList.Builder<DepAndRdep> builder = ImmutableList.builder();
- for (Map.Entry<SkyKey, Iterable<SkyKey>> rdeps : unfilteredReverseDeps.entrySet()) {
- for (SkyKey rdep : rdeps.getValue()) {
- Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(rdep);
- if (label != null) {
- builder.add(new DepAndRdep(rdeps.getKey(), rdep));
- }
- }
- }
+ ImmutableList.Builder<DepAndRdep> depAndRdepsToVisitBuilder = ImmutableList.builder();
+ env.graph.getReverseDeps(uniqueValidRdeps).entrySet()
+ .forEach(reverseDepsEntry -> depAndRdepsToVisitBuilder.addAll(
+ Iterables.transform(
+ Iterables.filter(
+ reverseDepsEntry.getValue(),
+ Predicates.and(SkyQueryEnvironment.IS_TTV, universe)),
+ rdep -> new DepAndRdep(reverseDepsEntry.getKey(), rdep))));
return new Visit(
- /*keysToUseForResult=*/ filteredUniqueKeys, /*keysToVisit=*/ builder.build());
- }
-
- @Override
- protected void processPartialResults(
- Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
- throws QueryException, InterruptedException {
- Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
- env.makePackageKeyToTargetKeyMap(keysToUseForResult);
- Set<PackageIdentifier> pkgIdsNeededForResult =
- packageKeyToTargetKeyMap
- .keySet()
- .stream()
- .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
- .collect(toImmutableSet());
- packageSemaphore.acquireAll(pkgIdsNeededForResult);
- try {
- callback.process(
- env.makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap).values());
- } finally {
- packageSemaphore.releaseAll(pkgIdsNeededForResult);
- }
+ /*keysToUseForResult=*/ uniqueValidRdeps,
+ /*keysToVisit=*/ depAndRdepsToVisitBuilder.build());
}
@Override
protected Iterable<DepAndRdep> preprocessInitialVisit(Iterable<SkyKey> keys) {
- return Iterables.transform(keys, key -> new DepAndRdep(null, key));
+ return Iterables.transform(
+ Iterables.filter(keys, k -> universe.apply(k)),
+ key -> new DepAndRdep(null, key));
}
@Override
- protected Iterable<Task> getVisitTasks(Collection<DepAndRdep> pendingKeysToVisit) {
- // Group pending (dep, rdep) visits by the package of the rdep, since we'll be targetfying the
- // rdep during the visitation.
- ListMultimap<PackageIdentifier, DepAndRdep> visitsByPackage =
- ArrayListMultimap.create();
- for (DepAndRdep depAndRdep : pendingKeysToVisit) {
- Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(depAndRdep.rdep);
- if (label != null) {
- visitsByPackage.put(label.getPackageIdentifier(), depAndRdep);
- }
- }
-
- ImmutableList.Builder<Task> builder = ImmutableList.builder();
-
- // A couple notes here:
- // (i) ArrayListMultimap#values returns the values grouped by key, which is exactly what we
- // want.
- // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid
- // accidentally retaining the entire ArrayListMultimap object.
- for (Iterable<DepAndRdep> depAndRdepBatch :
- Iterables.partition(ImmutableList.copyOf(visitsByPackage.values()), VISIT_BATCH_SIZE)) {
- builder.add(new VisitTask(depAndRdepBatch));
- }
-
- return builder.build();
+ protected SkyKey getRdepOfVisit(DepAndRdep visit) {
+ return visit.rdep;
}
@Override
@@ -396,7 +511,7 @@ public class ParallelSkyQueryUtils {
// Still, we include an implementation of 'getUniqueValues' that is correct in isolation so as
// to not be depending on implementation details of 'ParallelVisitor'.
//
- // Even so, there's value in not visiting a rdep if it's already been visiting *validly*
+ // Even so, there's value in not visiting a rdep if it's already been visited *validly*
// before. We use the intentionally racy {@link Uniquifier#uniquePure} to attempt to do this.
return depAndRdepUniquifier.unique(
Iterables.filter(
@@ -404,5 +519,278 @@ public class ParallelSkyQueryUtils {
depAndRdep -> validRdepUniquifier.uniquePure(depAndRdep.rdep)));
}
}
+
+ private static class DepAndRdepAtDepth {
+ private final DepAndRdep depAndRdep;
+ private final int rdepDepth;
+
+ private DepAndRdepAtDepth(DepAndRdep depAndRdep, int rdepDepth) {
+ this.depAndRdep = depAndRdep;
+ this.rdepDepth = rdepDepth;
+ }
+ }
+
+ /**
+ * A helper class that computes bounded 'allrdeps(<expr>, <depth>)' or
+ * 'rdeps(<precomputed-universe>, <expr>, <depth>)' via BFS.
+ *
+ * <p>This is very similar to {@link RdepsUnboundedVisitor}. A lot of the same concerns apply here
+ * but there are additional subtle concerns about the correctness of the bounded traversal: just
+ * like for the sequential implementation of bounded allrdeps, we use {@link MinDepthUniquifier}.
+ */
+ private static class RdepsBoundedVisitor extends AbstractRdepsVisitor<DepAndRdepAtDepth> {
+ private final int depth;
+ private final Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier;
+ private final MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier;
+ private final Predicate<SkyKey> universe;
+
+ private RdepsBoundedVisitor(
+ SkyQueryEnvironment env,
+ int depth,
+ Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier,
+ MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier,
+ Predicate<SkyKey> universe,
+ Callback<Target> callback,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+ super(env, callback, packageSemaphore);
+ this.depth = depth;
+ this.depAndRdepAtDepthUniquifier = depAndRdepAtDepthUniquifier;
+ this.validRdepMinDepthUniquifier = validRdepMinDepthUniquifier;
+ this.universe = universe;
+ }
+
+ private static class Factory implements ParallelVisitor.Factory {
+ private final SkyQueryEnvironment env;
+ private final int depth;
+ private final Uniquifier<DepAndRdepAtDepth> depAndRdepAtDepthUniquifier;
+ private final MinDepthUniquifier<SkyKey> validRdepMinDepthUniquifier;
+ private final Predicate<SkyKey> universe;
+ private final Callback<Target> callback;
+ private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
+
+ private Factory(
+ SkyQueryEnvironment env,
+ int depth,
+ Predicate<SkyKey> universe,
+ Callback<Target> callback,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+ this.env = env;
+ this.depth = depth;
+ this.universe = universe;
+ this.depAndRdepAtDepthUniquifier =
+ new UniquifierImpl<>(depAndRdepAtDepth -> depAndRdepAtDepth);
+ this.validRdepMinDepthUniquifier = env.createMinDepthSkyKeyUniquifier();
+ this.callback = callback;
+ this.packageSemaphore = packageSemaphore;
+ }
+
+ @Override
+ public ParallelVisitor<DepAndRdepAtDepth, Target> create() {
+ return new RdepsBoundedVisitor(
+ env,
+ depth,
+ depAndRdepAtDepthUniquifier,
+ validRdepMinDepthUniquifier,
+ universe,
+ callback,
+ packageSemaphore);
+ }
+ }
+
+ @Override
+ protected Visit getVisitResult(Iterable<DepAndRdepAtDepth> depAndRdepAtDepths)
+ throws InterruptedException {
+ Map<SkyKey, Integer> shallowestRdepDepthMap = new HashMap<>();
+ depAndRdepAtDepths.forEach(
+ depAndRdepAtDepth -> shallowestRdepDepthMap.merge(
+ depAndRdepAtDepth.depAndRdep.rdep, depAndRdepAtDepth.rdepDepth, Integer::min));
+
+ Collection<SkyKey> validRdeps = new ArrayList<>();
+
+ // Multimap of dep to all the reverse deps in this visitation. Used to filter out the
+ // disallowed deps.
+ Multimap<SkyKey, SkyKey> reverseDepMultimap = ArrayListMultimap.create();
+ for (DepAndRdepAtDepth depAndRdepAtDepth : depAndRdepAtDepths) {
+ // The "roots" of our visitation (see #preprocessInitialVisit) have a null 'dep' field.
+ if (depAndRdepAtDepth.depAndRdep.dep == null) {
+ validRdeps.add(depAndRdepAtDepth.depAndRdep.rdep);
+ } else {
+ reverseDepMultimap.put(
+ depAndRdepAtDepth.depAndRdep.dep, depAndRdepAtDepth.depAndRdep.rdep);
+ }
+ }
+
+ Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
+ env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepMultimap.values()));
+ Set<PackageIdentifier> pkgIdsNeededForTargetification =
+ packageKeyToTargetKeyMap
+ .keySet()
+ .stream()
+ .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
+ .collect(toImmutableSet());
+ packageSemaphore.acquireAll(pkgIdsNeededForTargetification);
+
+ try {
+ // Filter out disallowed deps. We cannot defer the targetification any further as we do not
+ // want to retrieve the rdeps of unwanted nodes (targets).
+ if (!reverseDepMultimap.isEmpty()) {
+ Collection<Target> filteredTargets =
+ env.filterRawReverseDepsOfTransitiveTraversalKeys(
+ reverseDepMultimap.asMap(), packageKeyToTargetKeyMap);
+ filteredTargets
+ .stream()
+ .map(SkyQueryEnvironment.TARGET_TO_SKY_KEY)
+ .forEachOrdered(validRdeps::add);
+ }
+ } finally {
+ packageSemaphore.releaseAll(pkgIdsNeededForTargetification);
+ }
+
+ ImmutableList<SkyKey> uniqueValidRdeps = validRdeps.stream()
+ .filter(validRdep -> validRdepMinDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(
+ validRdep, shallowestRdepDepthMap.get(validRdep)))
+ .collect(ImmutableList.toImmutableList());
+
+ // Don't bother getting the rdeps of the rdeps that are already at the depth bound.
+ Iterable<SkyKey> uniqueValidRdepsBelowDepthBound = Iterables.filter(
+ uniqueValidRdeps,
+ uniqueValidRdep -> shallowestRdepDepthMap.get(uniqueValidRdep) < depth);
+
+ // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next
+ // recursive visitation.
+ Map<SkyKey, Iterable<SkyKey>> unfilteredRdepsOfRdeps =
+ env.graph.getReverseDeps(uniqueValidRdepsBelowDepthBound);
+
+ ImmutableList.Builder<DepAndRdepAtDepth> depAndRdepAtDepthsToVisitBuilder =
+ ImmutableList.builder();
+ unfilteredRdepsOfRdeps.entrySet().forEach(entry -> {
+ SkyKey rdep = entry.getKey();
+ int depthOfRdepOfRdep = shallowestRdepDepthMap.get(rdep) + 1;
+ Streams.stream(entry.getValue())
+ .filter(Predicates.and(SkyQueryEnvironment.IS_TTV, universe))
+ .forEachOrdered(rdepOfRdep -> {
+ depAndRdepAtDepthsToVisitBuilder.add(
+ new DepAndRdepAtDepth(new DepAndRdep(rdep, rdepOfRdep), depthOfRdepOfRdep));
+ });
+ });
+
+ return new Visit(
+ /*keysToUseForResult=*/ uniqueValidRdeps,
+ /*keysToVisit=*/ depAndRdepAtDepthsToVisitBuilder.build());
+ }
+
+ @Override
+ protected Iterable<DepAndRdepAtDepth> preprocessInitialVisit(Iterable<SkyKey> keys) {
+ return Iterables.transform(
+ Iterables.filter(keys, k -> universe.apply(k)),
+ key -> new DepAndRdepAtDepth(new DepAndRdep(null, key), 0));
+ }
+
+ @Override
+ protected SkyKey getRdepOfVisit(DepAndRdepAtDepth visit) {
+ return visit.depAndRdep.rdep;
+ }
+
+ @Override
+ protected ImmutableList<DepAndRdepAtDepth> getUniqueValues(
+ Iterable<DepAndRdepAtDepth> depAndRdepAtDepths) {
+ // See the comment in RdepsUnboundedVisitor#getUniqueValues.
+ return depAndRdepAtDepthUniquifier.unique(
+ Iterables.filter(
+ depAndRdepAtDepths,
+ depAndRdepAtDepth -> validRdepMinDepthUniquifier.uniqueAtDepthLessThanOrEqualToPure(
+ depAndRdepAtDepth.depAndRdep.rdep, depAndRdepAtDepth.rdepDepth)));
+ }
+ }
+
+ /** Helper class that computes DTC in the form of {@link SkyKey} via BFS. */
+ // TODO(nharmata): This should only be for the TTV-land DTC (i.e. only follow TTV -> TTV edges).
+ private static class TransitiveClosureVisitor extends ParallelVisitor<SkyKey, SkyKey> {
+ private final SkyQueryEnvironment env;
+ private final Uniquifier<SkyKey> uniquifier;
+
+ private TransitiveClosureVisitor(
+ SkyQueryEnvironment env,
+ Uniquifier<SkyKey> uniquifier,
+ int processResultsBatchSize,
+ AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback) {
+ super(aggregateAllCallback, VISIT_BATCH_SIZE, processResultsBatchSize);
+ this.env = env;
+ this.uniquifier = uniquifier;
+ }
+
+ private static class Factory implements ParallelVisitor.Factory {
+ private final SkyQueryEnvironment env;
+ private final Uniquifier<SkyKey> uniquifier;
+ private final AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback;
+ private final int processResultsBatchSize;
+
+ private Factory(
+ SkyQueryEnvironment env,
+ Uniquifier<SkyKey> uniquifier,
+ int processResultsBatchSize,
+ AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> aggregateAllCallback) {
+ this.env = env;
+ this.uniquifier = uniquifier;
+ this.processResultsBatchSize = processResultsBatchSize;
+ this.aggregateAllCallback = aggregateAllCallback;
+ }
+
+ @Override
+ public ParallelVisitor<SkyKey, SkyKey> create() {
+ return new TransitiveClosureVisitor(
+ env, uniquifier, processResultsBatchSize, aggregateAllCallback);
+ }
+ }
+
+ @Override
+ protected void processPartialResults(
+ Iterable<SkyKey> keysToUseForResult, Callback<SkyKey> callback)
+ throws QueryException, InterruptedException {
+ callback.process(keysToUseForResult);
+ }
+
+ @Override
+ protected Visit getVisitResult(Iterable<SkyKey> values) throws InterruptedException {
+ Multimap<SkyKey, SkyKey> deps = env.getDirectDepsOfSkyKeys(values);
+ return new Visit(
+ /*keysToUseForResult=*/ deps.keySet(),
+ /*keysToVisit=*/ ImmutableSet.copyOf(deps.values()));
+ }
+
+ @Override
+ protected Iterable<SkyKey> preprocessInitialVisit(Iterable<SkyKey> keys) {
+ return keys;
+ }
+
+ @Override
+ protected ImmutableList<SkyKey> getUniqueValues(Iterable<SkyKey> values) {
+ return uniquifier.unique(values);
+ }
+ }
+
+ /** Thread-safe {@link AggregateAllCallback} backed by a concurrent {@link Set}. */
+ @ThreadSafe
+ private static class ThreadSafeAggregateAllSkyKeysCallback
+ implements AggregateAllCallback<SkyKey, ImmutableSet<SkyKey>> {
+
+ private final Set<SkyKey> results;
+
+ private ThreadSafeAggregateAllSkyKeysCallback(int concurrencyLevel) {
+ this.results =
+ Collections.newSetFromMap(new MapMaker().concurrencyLevel(concurrencyLevel).makeMap());
+ }
+
+ @Override
+ public void process(Iterable<SkyKey> partialResult)
+ throws QueryException, InterruptedException {
+ Iterables.addAll(results, partialResult);
+ }
+
+ @Override
+ public ImmutableSet<SkyKey> getResult() {
+ return ImmutableSet.copyOf(results);
+ }
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java
index bd20259da4..3ea72273a8 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java
@@ -15,7 +15,6 @@ package com.google.devtools.build.lib.query2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Streams;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
import com.google.devtools.build.lib.concurrent.BlockingStack;
@@ -153,8 +152,7 @@ public abstract class ParallelVisitor<T, V> {
void visitAndWaitForCompletion(Iterable<SkyKey> keys)
throws QueryException, InterruptedException {
- Streams.stream(getUniqueValues(preprocessInitialVisit(keys))).forEachOrdered(
- processingQueue::add);
+ getUniqueValues(preprocessInitialVisit(keys)).forEach(processingQueue::add);
executor.visitAndWaitForCompletion();
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 8b95172cc4..b13ed4750c 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -16,7 +16,6 @@ package com.google.devtools.build.lib.query2;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ascii;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
@@ -27,6 +26,7 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.collect.Iterables;
@@ -112,11 +112,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionException;
@@ -506,6 +504,21 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
return result;
}
+ /**
+ * Returns deps in the form of {@link SkyKey}s.
+ *
+ * <p>The implementation of this method does not filter deps, therefore it is expected to be used
+ * only when {@link SkyQueryEnvironment#dependencyFilter} is set to {@link
+ * DependencyFilter#ALL_DEPS}.
+ */
+ Multimap<SkyKey, SkyKey> getDirectDepsOfSkyKeys(Iterable<SkyKey> keys)
+ throws InterruptedException {
+ Preconditions.checkState(dependencyFilter == DependencyFilter.ALL_DEPS, dependencyFilter);
+ ImmutableMultimap.Builder<SkyKey, SkyKey> builder = ImmutableMultimap.builder();
+ graph.getDirectDeps(keys).forEach(builder::putAll);
+ return builder.build();
+ }
+
@Override
public Collection<Target> getReverseDeps(Iterable<Target> targets) throws InterruptedException {
return getReverseDepsOfTransitiveTraversalKeys(Iterables.transform(targets, TARGET_TO_SKY_KEY));
@@ -648,6 +661,11 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
}
@ThreadSafe
+ protected MinDepthUniquifier<SkyKey> createMinDepthSkyKeyUniquifier() {
+ return new MinDepthUniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
+ }
+
+ @ThreadSafe
Uniquifier<Target> createTargetUniquifier() {
return new UniquifierImpl<>(TargetKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
}
@@ -894,15 +912,11 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
// so no preloading of target patterns is necessary.
}
+ static final Predicate<SkyKey> IS_TTV = SkyFunctionName.functionIs(Label.TRANSITIVE_TRAVERSAL);
+
static final Function<SkyKey, Label> SKYKEY_TO_LABEL =
- skyKey -> {
- SkyFunctionName functionName = skyKey.functionName();
- if (!functionName.equals(Label.TRANSITIVE_TRAVERSAL)) {
- // Skip non-targets.
- return null;
- }
- return (Label) skyKey.argument();
- };
+ skyKey -> IS_TTV.apply(skyKey) ? (Label) skyKey.argument() : null;
+
static final Function<SkyKey, PackageIdentifier> PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER =
skyKey -> (PackageIdentifier) skyKey.argument();
@@ -1266,176 +1280,53 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
this, expression, context, callback, packageSemaphore);
}
+ @ThreadSafe
@Override
- public QueryTaskFuture<Void> getRdepsUnboundedInUniverseParallel(
+ public QueryTaskFuture<Void> getAllRdepsBoundedParallel(
QueryExpression expression,
+ int depth,
VariableContext<Target> context,
- List<Argument> args,
Callback<Target> callback) {
- return RdepsFunction.evalWithBoundedDepth(this, context, expression, args, callback);
+ return ParallelSkyQueryUtils.getAllRdepsBoundedParallel(
+ this, expression, depth, context, callback, packageSemaphore);
+ }
+
+ protected QueryTaskFuture<Predicate<SkyKey>> getUniverseDTCSkyKeyPredicateFuture(
+ QueryExpression universe,
+ VariableContext<Target> context) {
+ return ParallelSkyQueryUtils.getDTCSkyKeyPredicateFuture(
+ this,
+ universe,
+ context,
+ BATCH_CALLBACK_SIZE,
+ DEFAULT_THREAD_COUNT);
}
@ThreadSafe
@Override
- public QueryTaskFuture<Void> getAllRdeps(
+ public QueryTaskFuture<Void> getRdepsUnboundedParallel(
QueryExpression expression,
- Predicate<Target> universe,
+ QueryExpression universe,
VariableContext<Target> context,
- Callback<Target> callback,
- int depth) {
- return getAllRdeps(expression, universe, context, callback, depth, BATCH_CALLBACK_SIZE);
+ Callback<Target> callback) {
+ return transformAsync(
+ getUniverseDTCSkyKeyPredicateFuture(universe, context),
+ universePredicate -> ParallelSkyQueryUtils.getRdepsInUniverseUnboundedParallel(
+ this, expression, universePredicate, context, callback, packageSemaphore));
}
- /**
- * Computes and applies the callback to the reverse dependencies of the expression.
- *
- * <p>Batch size is used to only populate at most N targets at one time, because some individual
- * nodes are directly depended on by a large number of other nodes.
- */
- @VisibleForTesting
- protected QueryTaskFuture<Void> getAllRdeps(
+ @ThreadSafe
+ @Override
+ public QueryTaskFuture<Void> getRdepsBoundedParallel(
QueryExpression expression,
- Predicate<Target> universe,
- VariableContext<Target> context,
- Callback<Target> callback,
int depth,
- int batchSize) {
- MinDepthUniquifier<Target> minDepthUniquifier = createMinDepthUniquifier();
- return eval(
- expression,
- context,
- new BatchAllRdepsCallback(minDepthUniquifier, universe, callback, depth, batchSize));
- }
-
- private class BatchAllRdepsCallback implements Callback<Target> {
- private final MinDepthUniquifier<Target> minDepthUniquifier;
- private final Predicate<Target> universe;
- private final Callback<Target> callback;
- private final int depth;
- private final int batchSize;
-
- private BatchAllRdepsCallback(
- MinDepthUniquifier<Target> minDepthUniquifier,
- Predicate<Target> universe,
- Callback<Target> callback,
- int depth,
- int batchSize) {
- this.minDepthUniquifier = minDepthUniquifier;
- this.universe = universe;
- this.callback = callback;
- this.depth = depth;
- this.batchSize = batchSize;
- }
-
- @Override
- public void process(Iterable<Target> targets) throws QueryException, InterruptedException {
- Iterable<Target> currentInUniverse = Iterables.filter(targets, universe);
- ImmutableList<Target> uniqueTargets =
- minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(currentInUniverse, 0);
- callback.process(uniqueTargets);
-
- // Maintain a queue to allow tracking rdep relationships in BFS order. Rdeps are stored
- // as 1:N SkyKey mappings instead of fully populated Targets to save memory. Targets
- // have a reference to their entire Package, which is really memory expensive.
- Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue = new LinkedList<>();
- reverseDepsQueue.addAll(
- graph.getReverseDeps(makeTransitiveTraversalKeys(uniqueTargets)).entrySet());
-
- // In each iteration, we populate a size-limited (no more than batchSize) number of
- // SkyKey mappings to targets, and append the SkyKey rdeps mappings to the queue. Once
- // processed by callback, the targets are dequeued and not referenced any more, making
- // them available for garbage collection.
-
- for (int curDepth = 1; curDepth <= depth; curDepth++) {
- // The mappings between nodes and their reverse deps must be preserved instead of the
- // reverse deps alone. Later when deserializing dependent nodes using SkyKeys, we need to
- // check if their allowed deps contain the dependencies.
- Map<SkyKey, Iterable<SkyKey>> reverseDepsMap = Maps.newHashMap();
- int batch = 0; // Tracking the current total number of rdeps in reverseDepsMap.
- int processed = 0;
- // Save current size as when we are process nodes in the current level, new mappings (for
- // the next level) are added to the queue.
- int size = reverseDepsQueue.size();
- while (processed < size) {
- // We always peek the first element in the queue without polling it, to determine if
- // adding it to the pending list will break the limit of max size. If yes then we process
- // and empty the pending list first, and poll the element in the next iteration.
- Map.Entry<SkyKey, Iterable<SkyKey>> entry = reverseDepsQueue.peek();
-
- // The value of the entry is either a CompactHashSet or ImmutableList, which can return
- // the size in O(1) time.
- int rdepsSize = Iterables.size(entry.getValue());
- if (rdepsSize == 0) {
- reverseDepsQueue.poll();
- processed++;
- continue;
- }
-
- if ((rdepsSize + batch <= batchSize)) {
- // If current size is less than batch size, dequeue the node, update the current
- // batch size and map.
- reverseDepsMap.put(entry.getKey(), entry.getValue());
- batch += rdepsSize;
- reverseDepsQueue.poll();
- processed++;
- } else {
- if (batch == 0) {
- // The (single) node has more rdeps than the limit, divide them up to process
- // separately.
- for (Iterable<SkyKey> subList : Iterables.partition(entry.getValue(), batchSize)) {
- reverseDepsMap.put(entry.getKey(), subList);
- processReverseDepsMap(
- minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
- }
-
- reverseDepsQueue.poll();
- processed++;
- } else {
- // There are some nodes in the pending process list. Process them first and come
- // back to this node later (in next iteration).
- processReverseDepsMap(
- minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
- batch = 0;
- }
- }
- }
-
- if (!reverseDepsMap.isEmpty()) {
- processReverseDepsMap(
- minDepthUniquifier, reverseDepsMap, callback, reverseDepsQueue, curDepth);
- }
-
- // If the queue is empty after all nodes in the current level are processed, stop
- // processing as there are no more reverse deps.
- if (reverseDepsQueue.isEmpty()) {
- break;
- }
- }
- }
-
- /**
- * Populates {@link Target}s from reverse dep mappings of {@link SkyKey}s, empties the pending
- * list and add next level reverse dep mappings of {@link SkyKey}s to the queue.
- */
- private void processReverseDepsMap(
- MinDepthUniquifier<Target> minDepthUniquifier,
- Map<SkyKey, Iterable<SkyKey>> reverseDepsMap,
- Callback<Target> callback,
- Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue,
- int depth)
- throws QueryException, InterruptedException {
- Collection<Target> children = processRawReverseDeps(targetifyValues(reverseDepsMap));
- Iterable<Target> currentInUniverse = Iterables.filter(children, universe);
- ImmutableList<Target> uniqueChildren =
- minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(currentInUniverse, depth);
- reverseDepsMap.clear();
-
- if (!uniqueChildren.isEmpty()) {
- callback.process(uniqueChildren);
- reverseDepsQueue.addAll(
- graph.getReverseDeps(makeTransitiveTraversalKeys(uniqueChildren)).entrySet());
- }
- }
+ QueryExpression universe,
+ VariableContext<Target> context,
+ Callback<Target> callback) {
+ return transformAsync(
+ getUniverseDTCSkyKeyPredicateFuture(universe, context),
+ universePredicate -> ParallelSkyQueryUtils.getRdepsInUniverseBoundedParallel(
+ this, expression, depth, universePredicate, context, callback, packageSemaphore));
}
/**
@@ -1465,4 +1356,32 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
}
}
}
+
+ /** Pair of a key and a depth, useful for driving usages of {@link MinDepthUniquifier}. */
+ public static class KeyAtDepth {
+ public final SkyKey key;
+ public final int depth;
+
+ public KeyAtDepth(SkyKey key, int depth) {
+ this.key = key;
+ this.depth = depth;
+ }
+
+ @Override
+ public int hashCode() {
+ // N.B. - We deliberately use a garbage-free hashCode implementation (rather than e.g.
+ // Objects#hash). This method is very hot during large visitations done by
+ // ParallelSkyQueryUtils.
+ return 31 * key.hashCode() + Integer.hashCode(depth);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof KeyAtDepth)) {
+ return false;
+ }
+ KeyAtDepth other = (KeyAtDepth) obj;
+ return key.equals(other.key) && depth == other.depth;
+ }
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
index 5a3b91f575..6292b7f969 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
@@ -13,7 +13,6 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
-import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
@@ -58,11 +57,27 @@ public class AllRdepsFunction implements QueryFunction {
QueryExpression expression,
List<Argument> args,
Callback<T> callback) {
- return evalRdeps(env, context, args, callback, Optional.<Predicate<T>>absent());
+ boolean isDepthUnbounded = args.size() == 1;
+ int depth = isDepthUnbounded ? Integer.MAX_VALUE : args.get(1).getInteger();
+ QueryExpression argumentExpression = args.get(0).getExpression();
+ if (env instanceof StreamableQueryEnvironment) {
+ StreamableQueryEnvironment<T> streamableEnv = (StreamableQueryEnvironment<T>) env;
+ return isDepthUnbounded
+ ? streamableEnv.getAllRdepsUnboundedParallel(argumentExpression, context, callback)
+ : streamableEnv.getAllRdepsBoundedParallel(argumentExpression, depth, context, callback);
+ } else {
+ return eval(
+ env,
+ argumentExpression,
+ Predicates.<T>alwaysTrue(),
+ context,
+ callback,
+ depth);
+ }
}
- /** Evaluates rdeps query. */
- public static <T> QueryTaskFuture<Void> eval(
+ /** Common non-parallel implementation of depth-bounded allrdeps/deps. */
+ static <T> QueryTaskFuture<Void> eval(
final QueryEnvironment<T> env,
QueryExpression expression,
final Predicate<T> universe,
@@ -100,25 +115,4 @@ public class AllRdepsFunction implements QueryFunction {
}
});
}
-
- static <T> QueryTaskFuture<Void> evalRdeps(
- final QueryEnvironment<T> env,
- VariableContext<T> context,
- final List<Argument> args,
- final Callback<T> callback,
- Optional<Predicate<T>> universeMaybe) {
- final int depth = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE;
- final Predicate<T> universe = universeMaybe.isPresent()
- ? universeMaybe.get()
- : Predicates.<T>alwaysTrue();
- if (env instanceof StreamableQueryEnvironment<?>) {
- StreamableQueryEnvironment<T> streamableEnv = ((StreamableQueryEnvironment<T>) env);
- return depth == Integer.MAX_VALUE && !universeMaybe.isPresent()
- ? streamableEnv.getAllRdepsUnboundedParallel(args.get(0).getExpression(), context, callback)
- : streamableEnv.getAllRdeps(
- args.get(0).getExpression(), universe, context, callback, depth);
- } else {
- return eval(env, args.get(0).getExpression(), universe, context, callback, depth);
- }
- }
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java b/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java
index c62bea5754..92edef59b4 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java
@@ -23,9 +23,25 @@ import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
*/
@ThreadSafe
public interface MinDepthUniquifier<T> {
+
+ /**
+ * Returns whether {@code newElement} hasn't been seen before at depth less than or equal to
+ * {@code depth} by {@link #uniqueAtDepthLessThanOrEqualTo(T, int)} or
+ * {@link #uniqueAtDepthLessThanOrEqualTo(Iterable, int)}.
+ *
+ * <p>Please note the difference between this method and
+ * {@link #uniqueAtDepthLessThanOrEqualTo(T, int)}!
+ *
+ * <p>This method is inherently racy wrt {@link #uniqueAtDepthLessThanOrEqualTo(T, int)} and
+ * {@link #uniqueAtDepthLessThanOrEqualTo(Iterable, int)}. Only use it if you know what you are
+ * doing.
+ */
+ boolean uniqueAtDepthLessThanOrEqualToPure(T newElement, int depth);
+
/**
- * Returns the subset of {@code newElements} that haven't been seen before at depths less than or
- * equal to {@code depth}
+ * Returns whether {@code newElement} hasn't been seen before at depth less than or equal to
+ * {@code depth} by {@link #uniqueAtDepthLessThanOrEqualTo(T, int)} or
+ * {@link #uniqueAtDepthLessThanOrEqualTo(Iterable, int)}.
*
* <p> There's a natural benign check-then-act race in all concurrent uses of this interface.
* Imagine we have an element e, two depths d1 and d2 (with d2 < d1), and two threads T1 and T2.
@@ -33,6 +49,13 @@ public interface MinDepthUniquifier<T> {
* But before T1 finishes processing e, T2 may think _it's_ about to be first one to process an
* element at a depth less than or equal to than d2. T1's work is probably wasted.
*/
+ boolean uniqueAtDepthLessThanOrEqualTo(T newElement, int depth);
+
+ /**
+ * Batch version of {@link #uniqueAtDepthLessThanOrEqualTo(Object, int)}.
+ *
+ * <p>The same benign check-then-act race applies here too.
+ */
ImmutableList<T> uniqueAtDepthLessThanOrEqualTo(Iterable<T> newElements, int depth);
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
index 3c6714b943..f2a9939ff2 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
@@ -267,27 +267,41 @@ public final class QueryUtil {
@Override
public final ImmutableList<T> uniqueAtDepthLessThanOrEqualTo(
Iterable<T> newElements, int depth) {
- ImmutableList.Builder<T> result = ImmutableList.builder();
- for (T element : newElements) {
- AtomicInteger newDepth = new AtomicInteger(depth);
- AtomicInteger previousDepth =
- alreadySeenAtDepth.putIfAbsent(extractor.extractKey(element), newDepth);
- if (previousDepth != null) {
+ ImmutableList.Builder<T> resultBuilder = ImmutableList.builder();
+ for (T newElement : newElements) {
+ if (uniqueAtDepthLessThanOrEqualTo(newElement, depth)) {
+ resultBuilder.add(newElement);
+ }
+ }
+ return resultBuilder.build();
+ }
+
+ @Override
+ public boolean uniqueAtDepthLessThanOrEqualTo(T newElement, int depth) {
+ AtomicInteger newDepth = new AtomicInteger(depth);
+ AtomicInteger previousDepth =
+ alreadySeenAtDepth.putIfAbsent(extractor.extractKey(newElement), newDepth);
+ if (previousDepth == null) {
+ return true;
+ }
+ if (depth < previousDepth.get()) {
+ synchronized (previousDepth) {
if (depth < previousDepth.get()) {
- synchronized (previousDepth) {
- if (depth < previousDepth.get()) {
- // We've seen the element before, but never at a depth this shallow.
- previousDepth.set(depth);
- result.add(element);
- }
- }
+ // We've seen the element before, but never at a depth this shallow.
+ previousDepth.set(depth);
+ return true;
}
- } else {
- // We've never seen the element before.
- result.add(element);
}
}
- return result.build();
+ return false;
+ }
+
+ @Override
+ public boolean uniqueAtDepthLessThanOrEqualToPure(T newElement, int depth) {
+ AtomicInteger previousDepth = alreadySeenAtDepth.get(extractor.extractKey(newElement));
+ return previousDepth != null
+ ? depth < previousDepth.get()
+ : true;
}
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java
index 3971474edf..e40960a843 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java
@@ -14,7 +14,6 @@
package com.google.devtools.build.lib.query2.engine;
import com.google.common.base.Function;
-import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
@@ -53,35 +52,48 @@ public final class RdepsFunction extends AllRdepsFunction {
@Override
public <T> QueryTaskFuture<Void> eval(
- final QueryEnvironment<T> env,
- final VariableContext<T> context,
- final QueryExpression expression,
- final List<Argument> args,
- final Callback<T> callback) {
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ QueryExpression expression,
+ List<Argument> args,
+ Callback<T> callback) {
boolean isDepthUnbounded = args.size() == 2;
- return (isDepthUnbounded && env instanceof StreamableQueryEnvironment)
- ? ((StreamableQueryEnvironment<T>) env)
- .getRdepsUnboundedInUniverseParallel(expression, context, args, callback)
- : evalWithBoundedDepth(env, context, expression, args, callback);
+ int depth = isDepthUnbounded ? Integer.MAX_VALUE : args.get(2).getInteger();
+ QueryExpression universeExpression = args.get(0).getExpression();
+ QueryExpression argumentExpression = args.get(1).getExpression();
+ if (env instanceof StreamableQueryEnvironment) {
+ StreamableQueryEnvironment<T> streamableEnv = (StreamableQueryEnvironment<T>) env;
+ return isDepthUnbounded
+ ? streamableEnv.getRdepsUnboundedParallel(
+ argumentExpression, universeExpression, context, callback)
+ : streamableEnv.getRdepsBoundedParallel(
+ argumentExpression, depth, universeExpression, context, callback);
+ } else {
+ return evalWithBoundedDepth(
+ env, expression, context, argumentExpression, depth, universeExpression, callback);
+ }
}
/**
* Compute the transitive closure of the universe, then breadth-first search from the argument
* towards the universe while staying within the transitive closure.
*/
- public static <T> QueryTaskFuture<Void> evalWithBoundedDepth(
+ private static <T> QueryTaskFuture<Void> evalWithBoundedDepth(
QueryEnvironment<T> env,
+ QueryExpression rdepsFunctionExpressionForErrorMessages,
VariableContext<T> context,
- QueryExpression expression,
- final List<Argument> args,
+ QueryExpression argumentExpression,
+ int depth,
+ QueryExpression universeExpression,
Callback<T> callback) {
QueryTaskFuture<ThreadSafeMutableSet<T>> universeValueFuture =
- QueryUtil.evalAll(env, context, args.get(0).getExpression());
+ QueryUtil.evalAll(env, context, universeExpression);
Function<ThreadSafeMutableSet<T>, QueryTaskFuture<Void>> evalInUniverseAsyncFunction =
universeValue -> {
Predicate<T> universe;
try {
- env.buildTransitiveClosure(expression, universeValue, Integer.MAX_VALUE);
+ env.buildTransitiveClosure(
+ rdepsFunctionExpressionForErrorMessages, universeValue, Integer.MAX_VALUE);
universe = Predicates.in(env.getTransitiveClosure(universeValue));
} catch (InterruptedException e) {
return env.immediateCancelledFuture();
@@ -89,8 +101,8 @@ public final class RdepsFunction extends AllRdepsFunction {
return env.immediateFailedFuture(e);
}
- return AllRdepsFunction.evalRdeps(
- env, context, args.subList(1, args.size()), callback, Optional.of(universe));
+ return AllRdepsFunction.eval(
+ env, argumentExpression, universe, context, callback, depth);
};
return env.transformAsync(universeValueFuture, evalInUniverseAsyncFunction);
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
index b055d6ad0a..ef34742e58 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
@@ -13,38 +13,33 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
-import com.google.common.base.Predicate;
-import java.util.List;
-
/**
* The environment of a Blaze query which supports predefined streaming operations.
*
* @param <T> the node type of the dependency graph
*/
public interface StreamableQueryEnvironment<T> extends QueryEnvironment<T> {
-
- /** Retrieves and processes all reverse dependencies of given expression in a streaming manner. */
- QueryTaskFuture<Void> getAllRdeps(
+ QueryTaskFuture<Void> getAllRdepsBoundedParallel(
QueryExpression expression,
- Predicate<T> universe,
+ int depth,
VariableContext<T> context,
- Callback<T> callback,
- int depth);
+ Callback<T> callback);
- /** Similar to {@link #getAllRdeps} but finds all rdeps without a depth bound. */
QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
- QueryExpression expression, VariableContext<T> context, Callback<T> callback);
+ QueryExpression expression,
+ VariableContext<T> context,
+ Callback<T> callback);
+
+ QueryTaskFuture<Void> getRdepsBoundedParallel(
+ QueryExpression expression,
+ int depth,
+ QueryExpression universe,
+ VariableContext<T> context,
+ Callback<T> callback);
- /**
- * Similar to {@link #getAllRdepsUnboundedParallel} but finds rdeps in a universe without a depth
- * depth.
- *
- * @param expression a "rdeps" expression without depth, such as rdeps(u, x)
- * @param args two-item list containing both universe 'u' and argument set 'x' in rdeps(u, x)
- */
- QueryTaskFuture<Void> getRdepsUnboundedInUniverseParallel(
+ QueryTaskFuture<Void> getRdepsUnboundedParallel(
QueryExpression expression,
+ QueryExpression universe,
VariableContext<T> context,
- List<Argument> args,
Callback<T> callback);
}