aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main
diff options
context:
space:
mode:
authorGravatar Googler <noreply@google.com>2016-10-24 19:18:36 +0000
committerGravatar John Cater <jcater@google.com>2016-10-24 19:31:35 +0000
commitb3610d54c87938e89e81d83810c0d16a41115a11 (patch)
tree230108f475c3e75f2fa402ecec95d1796911ce02 /src/main
parentf190ae115d00e649fd32ff19b4663d1f8d51171c (diff)
Defer targetification of SkyKeys during unbounded allrdeps evaluation
-- MOS_MIGRATED_REVID=137064426
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java177
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java74
2 files changed, 172 insertions, 79 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index fb9841d9af..949413854e 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -13,13 +13,16 @@
// limitations under the License.
package com.google.devtools.build.lib.query2;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.collect.CompactHashSet;
-import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor;
import com.google.devtools.build.lib.concurrent.MoreFutures;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.packages.Target;
@@ -31,16 +34,19 @@ import com.google.devtools.build.lib.query2.engine.ThreadSafeUniquifier;
import com.google.devtools.build.lib.query2.engine.VariableContext;
import com.google.devtools.build.lib.skyframe.PackageValue;
import com.google.devtools.build.lib.skyframe.SkyFunctions;
+import com.google.devtools.build.lib.util.Pair;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.skyframe.SkyKey;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
-
/**
* Parallel implementations of various functionality in {@link SkyQueryEnvironment}.
*
@@ -85,7 +91,7 @@ class ParallelSkyQueryUtils {
}
/** A helper class that computes 'rbuildfiles(<blah>)' via BFS. */
- private static class RBuildFilesVisitor extends AbstractSkyKeyBFSVisitor {
+ private static class RBuildFilesVisitor extends AbstractSkyKeyBFSVisitor<SkyKey> {
private final SkyQueryEnvironment env;
private RBuildFilesVisitor(
@@ -124,16 +130,33 @@ class ParallelSkyQueryUtils {
return SkyQueryEnvironment.getBuildFilesForPackageValues(
env.graph.getSuccessfulValues(keysToUseForResult).values());
}
+
+ @Override
+ protected Iterable<SkyKey> preprocessInitialVisit(Iterable<SkyKey> keys) {
+ return keys;
+ }
}
- /** A helper class that computes 'allrdeps(<blah>)' via BFS. */
- private static class AllRdepsUnboundedVisitor extends AbstractSkyKeyBFSVisitor {
+ /**
+ * A helper class that computes 'allrdeps(<blah>)' via BFS.
+ *
+ * <p>The visitor uses a pair of <node, reverse dep> to keep track the nodes to visit and avoid
+ * dealing with targetification of reverse deps until they are needed. The node itself is needed
+ * to filter out disallowed deps later. Compared against the approach using a single SkyKey, it
+ * consumes 16 more bytes in a 64-bit environment for each edge. However it defers the need to
+ * load all the packages which have at least a target as a rdep of the current batch, thus greatly
+ * reduces the risk of OOMs. The additional memory usage should not be a large concern here, as
+ * even with 10M edges, the memory overhead is around 160M, and the memory can be reclaimed by
+ * regular GC.
+ */
+ private static class AllRdepsUnboundedVisitor
+ extends AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> {
private final SkyQueryEnvironment env;
private AllRdepsUnboundedVisitor(
SkyQueryEnvironment env,
ForkJoinPool forkJoinPool,
- ThreadSafeUniquifier<SkyKey> uniquifier,
+ ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier,
ThreadSafeCallback<Target> callback) {
super(forkJoinPool, uniquifier, callback);
this.env = env;
@@ -149,7 +172,7 @@ class ParallelSkyQueryUtils {
private static class Factory implements AbstractSkyKeyBFSVisitor.Factory {
private final SkyQueryEnvironment env;
private final ForkJoinPool forkJoinPool;
- private final ThreadSafeUniquifier<SkyKey> uniquifier;
+ private final ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier;
private final ThreadSafeCallback<Target> callback;
private Factory(
@@ -158,43 +181,74 @@ class ParallelSkyQueryUtils {
ForkJoinPool forkJoinPool) {
this.env = env;
this.forkJoinPool = forkJoinPool;
- this.uniquifier = env.createSkyKeyUniquifier();
+ this.uniquifier = env.createReverseDepSkyKeyUniquifier();
this.callback = callback;
}
@Override
- public AbstractSkyKeyBFSVisitor create() {
+ public AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> create() {
return new AllRdepsUnboundedVisitor(env, forkJoinPool, uniquifier, callback);
}
}
@Override
- protected Visit getVisitResult(Iterable<SkyKey> keys) throws InterruptedException {
- // TODO(bazel-team): Defer some of this work to the next recursive visitation. Instead, have
- // this visitation merely get the Skyframe-land rdeps.
-
- // Note that this does more than merely get the Skyframe-land rdeps:
- // (i) It only returns rdeps that have corresponding Targets.
- // (ii) It only returns rdeps whose corresponding Targets have a valid dependency edge to
- // their direct dep.
- Iterable<Target> rdepTargets = env.getReverseDepsOfTransitiveTraversalKeys(keys);
- // Group the targets by package - this way when computeImpl splits these targets into batches,
- // targets in the same package are likely to be in the same batch.
- ArrayListMultimap<PackageIdentifier, SkyKey> rdepKeysByPackage = ArrayListMultimap.create();
- for (Target rdepTarget : rdepTargets) {
- rdepKeysByPackage.put(
- rdepTarget.getLabel().getPackageIdentifier(),
- SkyQueryEnvironment.TARGET_TO_SKY_KEY.apply(rdepTarget));
+ protected Visit getVisitResult(Iterable<Pair<SkyKey, SkyKey>> keys)
+ throws InterruptedException {
+ Collection<SkyKey> filteredKeys = new ArrayList<>();
+
+ // Build a raw reverse dep map from pairs of SkyKeys to filter out the disallowed deps.
+ Map<SkyKey, Collection<SkyKey>> reverseDepsMap = Maps.newHashMap();
+ for (Pair<SkyKey, SkyKey> reverseDepPair : keys) {
+ // First-level nodes do not have a parent node (they may have one in Skyframe but we do not
+ // need to retrieve them.
+ if (reverseDepPair.first == null) {
+ filteredKeys.add(Preconditions.checkNotNull(reverseDepPair.second));
+ continue;
+ }
+
+ if (!reverseDepsMap.containsKey(reverseDepPair.first)) {
+ reverseDepsMap.put(reverseDepPair.first, new LinkedList<>());
+ }
+
+ reverseDepsMap.get(reverseDepPair.first).add(reverseDepPair.second);
+ }
+
+ // Filter out disallowed deps. We cannot defer the targetification any further as we do not
+ // want to retrieve the rdeps of unwanted nodes (targets).
+ if (!reverseDepsMap.isEmpty()) {
+ Collection<Target> filteredTargets =
+ env.filterRawReverseDepsOfTransitiveTraversalKeys(reverseDepsMap);
+ filteredKeys.addAll(
+ Collections2.transform(filteredTargets, SkyQueryEnvironment.TARGET_TO_SKY_KEY));
}
+
+ // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next
+ // recursive visitation.
+ Map<SkyKey, Iterable<SkyKey>> unfilteredReverseDeps = env.graph.getReverseDeps(filteredKeys);
+
+ // Build a collection of Pairs and group by package id so we can partition them efficiently
+ // later.
+ ArrayListMultimap<PackageIdentifier, Pair<SkyKey, SkyKey>> rdepsByPackage =
+ ArrayListMultimap.create();
+ for (Map.Entry<SkyKey, Iterable<SkyKey>> rdeps : unfilteredReverseDeps.entrySet()) {
+ for (SkyKey rdep : rdeps.getValue()) {
+ Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(rdep);
+ if (label != null) {
+ rdepsByPackage.put(label.getPackageIdentifier(), Pair.of(rdeps.getKey(), rdep));
+ }
+ }
+ }
+
// A couple notes here:
// (i) ArrayListMultimap#values returns the values grouped by key, which is exactly what we
// want.
// (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid
// accidentally retaining the entire ArrayListMultimap object.
- Iterable<SkyKey> keysToVisit = ImmutableList.copyOf(rdepKeysByPackage.values());
- return new Visit(
- /*keysToUseForResult=*/ keys,
- /*keysToVisit=*/ keysToVisit);
+ Iterable<Pair<SkyKey, SkyKey>> keysToVisit = ImmutableList.copyOf(rdepsByPackage.values());
+
+ // TODO(shazh): Use a global pool to store keys to be returned and keys to be processed, and
+ // assign them to VisitTasks. It allows us to better optimize package retrieval.
+ return new Visit(/*keysToUseForResult=*/ filteredKeys, /*keysToVisit=*/ keysToVisit);
}
@Override
@@ -202,6 +256,20 @@ class ParallelSkyQueryUtils {
throws InterruptedException {
return env.makeTargetsFromSkyKeys(keysToUseForResult).values();
}
+
+ @Override
+ protected Iterable<Pair<SkyKey, SkyKey>> preprocessInitialVisit(Iterable<SkyKey> keys) {
+ return Iterables.transform(
+ keys,
+ new Function<SkyKey, Pair<SkyKey, SkyKey>>() {
+ @Override
+ public Pair<SkyKey, SkyKey> apply(SkyKey key) {
+ // Set parent of first-level nodes to null. They are handled specially in
+ // AllRdepsUnboundedVisitor#getVisitResult and will not be filtered later.
+ return Pair.of(null, key);
+ }
+ });
+ }
}
/**
@@ -218,33 +286,31 @@ class ParallelSkyQueryUtils {
@Override
public void process(Iterable<Target> partialResult)
throws QueryException, InterruptedException {
- AbstractSkyKeyBFSVisitor visitor = visitorFactory.create();
+ AbstractSkyKeyBFSVisitor<?> visitor = visitorFactory.create();
visitor.visitAndWaitForCompletion(
SkyQueryEnvironment.makeTransitiveTraversalKeysStrict(partialResult));
}
}
/**
- * A helper class for performing a custom BFS visitation on the Skyframe graph, using
- * {@link ForkJoinQuiescingExecutor}.
+ * A helper class for performing a custom BFS visitation on the Skyframe graph, using {@link
+ * ForkJoinQuiescingExecutor}.
*
* <p>The choice of {@link ForkJoinPool} over, say, AbstractQueueVisitor backed by a
- * ThreadPoolExecutor, is very deliberate. {@link SkyKeyBFSVisitorCallback#process} kicks off
- * a visitation and blocks on completion of it. But this visitation may never complete if there
- * are a bounded number of threads in the global thread pool used for query evaluation!
+ * ThreadPoolExecutor, is very deliberate. {@link SkyKeyBFSVisitorCallback#process} kicks off a
+ * visitation and blocks on completion of it. But this visitation may never complete if there are
+ * a bounded number of threads in the global thread pool used for query evaluation!
*/
@ThreadSafe
- private abstract static class AbstractSkyKeyBFSVisitor {
+ private abstract static class AbstractSkyKeyBFSVisitor<T> {
private final ForkJoinPool forkJoinPool;
- private final ThreadSafeUniquifier<SkyKey> uniquifier;
+ private final ThreadSafeUniquifier<T> uniquifier;
private final Callback<Target> callback;
/** The maximum number of keys to visit at once. */
private static final int VISIT_BATCH_SIZE = 10000;
private AbstractSkyKeyBFSVisitor(
- ForkJoinPool forkJoinPool,
- ThreadSafeUniquifier<SkyKey> uniquifier,
- Callback<Target> callback) {
+ ForkJoinPool forkJoinPool, ThreadSafeUniquifier<T> uniquifier, Callback<Target> callback) {
this.forkJoinPool = forkJoinPool;
this.uniquifier = uniquifier;
this.callback = callback;
@@ -252,14 +318,14 @@ class ParallelSkyQueryUtils {
/** Factory for {@link AbstractSkyKeyBFSVisitor} instances. */
private static interface Factory {
- AbstractSkyKeyBFSVisitor create();
+ AbstractSkyKeyBFSVisitor<?> create();
}
- protected static final class Visit {
+ protected final class Visit {
private final Iterable<SkyKey> keysToUseForResult;
- private final Iterable<SkyKey> keysToVisit;
+ private final Iterable<T> keysToVisit;
- private Visit(Iterable<SkyKey> keysToUseForResult, Iterable<SkyKey> keysToVisit) {
+ private Visit(Iterable<SkyKey> keysToUseForResult, Iterable<T> keysToVisit) {
this.keysToUseForResult = keysToUseForResult;
this.keysToVisit = keysToVisit;
}
@@ -267,9 +333,11 @@ class ParallelSkyQueryUtils {
void visitAndWaitForCompletion(Iterable<SkyKey> keys)
throws QueryException, InterruptedException {
- Iterable<ForkJoinTask<?>> tasks = getTasks(new Visit(
- /*keysToUseForResult=*/ ImmutableList.<SkyKey>of(),
- /*keysToVisit=*/ keys));
+ Iterable<ForkJoinTask<?>> tasks =
+ getTasks(
+ new Visit(
+ /*keysToUseForResult=*/ ImmutableList.<SkyKey>of(),
+ /*keysToVisit=*/ preprocessInitialVisit(keys)));
for (ForkJoinTask<?> task : tasks) {
forkJoinPool.execute(task);
}
@@ -303,15 +371,15 @@ class ParallelSkyQueryUtils {
}
private class VisitTask extends AbstractInternalRecursiveAction {
- private final Iterable<SkyKey> keysToVisit;
+ private final Iterable<T> keysToVisit;
- private VisitTask(Iterable<SkyKey> keysToVisit) {
+ private VisitTask(Iterable<T> keysToVisit) {
this.keysToVisit = keysToVisit;
}
@Override
protected void computeImpl() throws InterruptedException {
- ImmutableList<SkyKey> uniqueKeys = uniquifier.unique(keysToVisit);
+ ImmutableList<T> uniqueKeys = uniquifier.unique(keysToVisit);
if (uniqueKeys.isEmpty()) {
return;
}
@@ -343,8 +411,8 @@ class ParallelSkyQueryUtils {
// getting and outputting results, each of which obeys the separate batch limits.
// TODO(bazel-team): Attempt to group work on targets within the same package.
ImmutableList.Builder<ForkJoinTask<?>> tasksBuilder = ImmutableList.builder();
- for (Iterable<SkyKey> keysToVisitBatch
- : Iterables.partition(visit.keysToVisit, VISIT_BATCH_SIZE)) {
+ for (Iterable<T> keysToVisitBatch :
+ Iterables.partition(visit.keysToVisit, VISIT_BATCH_SIZE)) {
tasksBuilder.add(new VisitTask(keysToVisitBatch));
}
for (Iterable<SkyKey> keysToUseForResultBatch : Iterables.partition(
@@ -362,7 +430,10 @@ class ParallelSkyQueryUtils {
Iterable<SkyKey> keysToUseForResult) throws InterruptedException;
/** Gets the {@link Visit} representing the local visitation of the given {@code values}. */
- protected abstract Visit getVisitResult(Iterable<SkyKey> values) throws InterruptedException;
+ protected abstract Visit getVisitResult(Iterable<T> values) throws InterruptedException;
+
+ /** Gets the first {@link Visit} representing the entry-level SkyKeys. */
+ protected abstract Iterable<T> preprocessInitialVisit(Iterable<SkyKey> keys);
}
private static class RuntimeQueryException extends RuntimeException {
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 825adafdaf..1f2bcd709d 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -78,6 +78,7 @@ import com.google.devtools.build.lib.skyframe.SkyFunctions;
import com.google.devtools.build.lib.skyframe.TargetPatternValue;
import com.google.devtools.build.lib.skyframe.TargetPatternValue.TargetPatternKey;
import com.google.devtools.build.lib.skyframe.TransitiveTraversalValue;
+import com.google.devtools.build.lib.util.Pair;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.RootedPath;
@@ -121,13 +122,6 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
protected static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors();
private static final int MAX_QUERY_EXPRESSION_LOG_CHARS = 1000;
private static final Logger LOG = Logger.getLogger(SkyQueryEnvironment.class.getName());
- private static final Function<Target, Label> TARGET_LABEL_FUNCTION =
- new Function<Target, Label>() {
- @Override
- public Label apply(Target target) {
- return target.getLabel();
- }
- };
private final BlazeTargetAccessor accessor = new BlazeTargetAccessor(this);
private final int loadingPhaseThreads;
@@ -345,14 +339,14 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
return super.evaluateQuery(expr, batchCallback);
}
- private Map<SkyKey, Collection<Target>> targetifyValues(Map<SkyKey, Iterable<SkyKey>> input)
- throws InterruptedException {
+ private Map<SkyKey, Collection<Target>> targetifyValues(
+ Map<SkyKey, ? extends Iterable<SkyKey>> input) throws InterruptedException {
ImmutableMap.Builder<SkyKey, Collection<Target>> result = ImmutableMap.builder();
Map<SkyKey, Target> allTargets =
makeTargetsFromSkyKeys(Sets.newHashSet(Iterables.concat(input.values())));
- for (Map.Entry<SkyKey, Iterable<SkyKey>> entry : input.entrySet()) {
+ for (Map.Entry<SkyKey, ? extends Iterable<SkyKey>> entry : input.entrySet()) {
Iterable<SkyKey> skyKeys = entry.getValue();
Set<Target> targets = CompactHashSet.createWithExpectedSize(Iterables.size(skyKeys));
for (SkyKey key : skyKeys) {
@@ -449,6 +443,12 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
return processRawReverseDeps(rawReverseDeps);
}
+ /** Targetify SkyKeys of reverse deps and filter out targets whose deps are not allowed. */
+ Collection<Target> filterRawReverseDepsOfTransitiveTraversalKeys(
+ Map<SkyKey, ? extends Iterable<SkyKey>> rawReverseDeps) throws InterruptedException {
+ return processRawReverseDeps(targetifyValues(rawReverseDeps));
+ }
+
private Collection<Target> processRawReverseDeps(Map<SkyKey, Collection<Target>> rawReverseDeps)
throws InterruptedException {
Set<Target> result = CompactHashSet.create();
@@ -552,6 +552,11 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
}
@ThreadSafe
+ ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> createReverseDepSkyKeyUniquifier() {
+ return new ThreadSafeReverseDepSkyKeyUniquifier(DEFAULT_THREAD_COUNT);
+ }
+
+ @ThreadSafe
@Override
public void getTargetsMatchingPattern(
QueryExpression owner, String pattern, Callback<Target> callback)
@@ -732,18 +737,19 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
// so no preloading of target patterns is necessary.
}
- private static final Function<SkyKey, Label> SKYKEY_TO_LABEL = new Function<SkyKey, Label>() {
- @Nullable
- @Override
- public Label apply(SkyKey skyKey) {
- SkyFunctionName functionName = skyKey.functionName();
- if (!functionName.equals(SkyFunctions.TRANSITIVE_TRAVERSAL)) {
- // Skip non-targets.
- return null;
- }
- return (Label) skyKey.argument();
- }
- };
+ static final Function<SkyKey, Label> SKYKEY_TO_LABEL =
+ new Function<SkyKey, Label>() {
+ @Nullable
+ @Override
+ public Label apply(SkyKey skyKey) {
+ SkyFunctionName functionName = skyKey.functionName();
+ if (!functionName.equals(SkyFunctions.TRANSITIVE_TRAVERSAL)) {
+ // Skip non-targets.
+ return null;
+ }
+ return (Label) skyKey.argument();
+ }
+ };
@ThreadSafe
public Map<SkyKey, Target> makeTargetsFromSkyKeys(Iterable<SkyKey> keys)
@@ -1012,6 +1018,22 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
}
/**
+ * A uniquifer which takes a pair of parent and reverse dep, and uniquify based on the second
+ * element (reverse dep).
+ */
+ private static class ThreadSafeReverseDepSkyKeyUniquifier
+ extends AbstractThreadSafeUniquifier<Pair<SkyKey, SkyKey>, SkyKey> {
+ protected ThreadSafeReverseDepSkyKeyUniquifier(int concurrencyLevel) {
+ super(concurrencyLevel);
+ }
+
+ @Override
+ protected SkyKey extractKey(Pair<SkyKey, SkyKey> element) {
+ return element.second;
+ }
+ }
+
+ /**
* Wraps a {@link Callback} and guarantees that all calls to the original will have at least
* {@code batchThreshold} {@link Target}s, except for the final such call.
*
@@ -1021,12 +1043,12 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
* #processLastPending} must be called to "flush" any remaining {@link Target}s through to the
* original.
*
- * <p>This callback may be called from multiple threads concurrently. At most one thread will
- * call the wrapped {@code callback} concurrently.
+ * <p>This callback may be called from multiple threads concurrently. At most one thread will call
+ * the wrapped {@code callback} concurrently.
*/
@ThreadSafe
- private static class BatchStreamedCallback
- extends OutputFormatterCallback<Target> implements ThreadSafeCallback<Target> {
+ private static class BatchStreamedCallback extends OutputFormatterCallback<Target>
+ implements ThreadSafeCallback<Target> {
private final OutputFormatterCallback<Target> callback;
private final ThreadSafeUniquifier<Target> uniquifier =