From b3610d54c87938e89e81d83810c0d16a41115a11 Mon Sep 17 00:00:00 2001 From: Googler Date: Mon, 24 Oct 2016 19:18:36 +0000 Subject: Defer targetification of SkyKeys during unbounded allrdeps evaluation -- MOS_MIGRATED_REVID=137064426 --- .../build/lib/query2/ParallelSkyQueryUtils.java | 177 +++++++++++++++------ .../build/lib/query2/SkyQueryEnvironment.java | 74 ++++++--- 2 files changed, 172 insertions(+), 79 deletions(-) (limited to 'src/main') diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java index fb9841d9af..949413854e 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java @@ -13,13 +13,16 @@ // limitations under the License. package com.google.devtools.build.lib.query2; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.collect.CompactHashSet; -import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor; import com.google.devtools.build.lib.concurrent.MoreFutures; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.packages.Target; @@ -31,16 +34,19 @@ import com.google.devtools.build.lib.query2.engine.ThreadSafeUniquifier; import com.google.devtools.build.lib.query2.engine.VariableContext; import com.google.devtools.build.lib.skyframe.PackageValue; import com.google.devtools.build.lib.skyframe.SkyFunctions; +import com.google.devtools.build.lib.util.Pair; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.skyframe.SkyKey; +import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedList; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveAction; - /** * Parallel implementations of various functionality in {@link SkyQueryEnvironment}. * @@ -85,7 +91,7 @@ class ParallelSkyQueryUtils { } /** A helper class that computes 'rbuildfiles()' via BFS. */ - private static class RBuildFilesVisitor extends AbstractSkyKeyBFSVisitor { + private static class RBuildFilesVisitor extends AbstractSkyKeyBFSVisitor { private final SkyQueryEnvironment env; private RBuildFilesVisitor( @@ -124,16 +130,33 @@ class ParallelSkyQueryUtils { return SkyQueryEnvironment.getBuildFilesForPackageValues( env.graph.getSuccessfulValues(keysToUseForResult).values()); } + + @Override + protected Iterable preprocessInitialVisit(Iterable keys) { + return keys; + } } - /** A helper class that computes 'allrdeps()' via BFS. */ - private static class AllRdepsUnboundedVisitor extends AbstractSkyKeyBFSVisitor { + /** + * A helper class that computes 'allrdeps()' via BFS. + * + *

The visitor uses a pair of to keep track the nodes to visit and avoid + * dealing with targetification of reverse deps until they are needed. The node itself is needed + * to filter out disallowed deps later. Compared against the approach using a single SkyKey, it + * consumes 16 more bytes in a 64-bit environment for each edge. However it defers the need to + * load all the packages which have at least a target as a rdep of the current batch, thus greatly + * reduces the risk of OOMs. The additional memory usage should not be a large concern here, as + * even with 10M edges, the memory overhead is around 160M, and the memory can be reclaimed by + * regular GC. + */ + private static class AllRdepsUnboundedVisitor + extends AbstractSkyKeyBFSVisitor> { private final SkyQueryEnvironment env; private AllRdepsUnboundedVisitor( SkyQueryEnvironment env, ForkJoinPool forkJoinPool, - ThreadSafeUniquifier uniquifier, + ThreadSafeUniquifier> uniquifier, ThreadSafeCallback callback) { super(forkJoinPool, uniquifier, callback); this.env = env; @@ -149,7 +172,7 @@ class ParallelSkyQueryUtils { private static class Factory implements AbstractSkyKeyBFSVisitor.Factory { private final SkyQueryEnvironment env; private final ForkJoinPool forkJoinPool; - private final ThreadSafeUniquifier uniquifier; + private final ThreadSafeUniquifier> uniquifier; private final ThreadSafeCallback callback; private Factory( @@ -158,43 +181,74 @@ class ParallelSkyQueryUtils { ForkJoinPool forkJoinPool) { this.env = env; this.forkJoinPool = forkJoinPool; - this.uniquifier = env.createSkyKeyUniquifier(); + this.uniquifier = env.createReverseDepSkyKeyUniquifier(); this.callback = callback; } @Override - public AbstractSkyKeyBFSVisitor create() { + public AbstractSkyKeyBFSVisitor> create() { return new AllRdepsUnboundedVisitor(env, forkJoinPool, uniquifier, callback); } } @Override - protected Visit getVisitResult(Iterable keys) throws InterruptedException { - // TODO(bazel-team): Defer some of this work to the next recursive visitation. Instead, have - // this visitation merely get the Skyframe-land rdeps. - - // Note that this does more than merely get the Skyframe-land rdeps: - // (i) It only returns rdeps that have corresponding Targets. - // (ii) It only returns rdeps whose corresponding Targets have a valid dependency edge to - // their direct dep. - Iterable rdepTargets = env.getReverseDepsOfTransitiveTraversalKeys(keys); - // Group the targets by package - this way when computeImpl splits these targets into batches, - // targets in the same package are likely to be in the same batch. - ArrayListMultimap rdepKeysByPackage = ArrayListMultimap.create(); - for (Target rdepTarget : rdepTargets) { - rdepKeysByPackage.put( - rdepTarget.getLabel().getPackageIdentifier(), - SkyQueryEnvironment.TARGET_TO_SKY_KEY.apply(rdepTarget)); + protected Visit getVisitResult(Iterable> keys) + throws InterruptedException { + Collection filteredKeys = new ArrayList<>(); + + // Build a raw reverse dep map from pairs of SkyKeys to filter out the disallowed deps. + Map> reverseDepsMap = Maps.newHashMap(); + for (Pair reverseDepPair : keys) { + // First-level nodes do not have a parent node (they may have one in Skyframe but we do not + // need to retrieve them. + if (reverseDepPair.first == null) { + filteredKeys.add(Preconditions.checkNotNull(reverseDepPair.second)); + continue; + } + + if (!reverseDepsMap.containsKey(reverseDepPair.first)) { + reverseDepsMap.put(reverseDepPair.first, new LinkedList<>()); + } + + reverseDepsMap.get(reverseDepPair.first).add(reverseDepPair.second); + } + + // Filter out disallowed deps. We cannot defer the targetification any further as we do not + // want to retrieve the rdeps of unwanted nodes (targets). + if (!reverseDepsMap.isEmpty()) { + Collection filteredTargets = + env.filterRawReverseDepsOfTransitiveTraversalKeys(reverseDepsMap); + filteredKeys.addAll( + Collections2.transform(filteredTargets, SkyQueryEnvironment.TARGET_TO_SKY_KEY)); } + + // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next + // recursive visitation. + Map> unfilteredReverseDeps = env.graph.getReverseDeps(filteredKeys); + + // Build a collection of Pairs and group by package id so we can partition them efficiently + // later. + ArrayListMultimap> rdepsByPackage = + ArrayListMultimap.create(); + for (Map.Entry> rdeps : unfilteredReverseDeps.entrySet()) { + for (SkyKey rdep : rdeps.getValue()) { + Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(rdep); + if (label != null) { + rdepsByPackage.put(label.getPackageIdentifier(), Pair.of(rdeps.getKey(), rdep)); + } + } + } + // A couple notes here: // (i) ArrayListMultimap#values returns the values grouped by key, which is exactly what we // want. // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid // accidentally retaining the entire ArrayListMultimap object. - Iterable keysToVisit = ImmutableList.copyOf(rdepKeysByPackage.values()); - return new Visit( - /*keysToUseForResult=*/ keys, - /*keysToVisit=*/ keysToVisit); + Iterable> keysToVisit = ImmutableList.copyOf(rdepsByPackage.values()); + + // TODO(shazh): Use a global pool to store keys to be returned and keys to be processed, and + // assign them to VisitTasks. It allows us to better optimize package retrieval. + return new Visit(/*keysToUseForResult=*/ filteredKeys, /*keysToVisit=*/ keysToVisit); } @Override @@ -202,6 +256,20 @@ class ParallelSkyQueryUtils { throws InterruptedException { return env.makeTargetsFromSkyKeys(keysToUseForResult).values(); } + + @Override + protected Iterable> preprocessInitialVisit(Iterable keys) { + return Iterables.transform( + keys, + new Function>() { + @Override + public Pair apply(SkyKey key) { + // Set parent of first-level nodes to null. They are handled specially in + // AllRdepsUnboundedVisitor#getVisitResult and will not be filtered later. + return Pair.of(null, key); + } + }); + } } /** @@ -218,33 +286,31 @@ class ParallelSkyQueryUtils { @Override public void process(Iterable partialResult) throws QueryException, InterruptedException { - AbstractSkyKeyBFSVisitor visitor = visitorFactory.create(); + AbstractSkyKeyBFSVisitor visitor = visitorFactory.create(); visitor.visitAndWaitForCompletion( SkyQueryEnvironment.makeTransitiveTraversalKeysStrict(partialResult)); } } /** - * A helper class for performing a custom BFS visitation on the Skyframe graph, using - * {@link ForkJoinQuiescingExecutor}. + * A helper class for performing a custom BFS visitation on the Skyframe graph, using {@link + * ForkJoinQuiescingExecutor}. * *

The choice of {@link ForkJoinPool} over, say, AbstractQueueVisitor backed by a - * ThreadPoolExecutor, is very deliberate. {@link SkyKeyBFSVisitorCallback#process} kicks off - * a visitation and blocks on completion of it. But this visitation may never complete if there - * are a bounded number of threads in the global thread pool used for query evaluation! + * ThreadPoolExecutor, is very deliberate. {@link SkyKeyBFSVisitorCallback#process} kicks off a + * visitation and blocks on completion of it. But this visitation may never complete if there are + * a bounded number of threads in the global thread pool used for query evaluation! */ @ThreadSafe - private abstract static class AbstractSkyKeyBFSVisitor { + private abstract static class AbstractSkyKeyBFSVisitor { private final ForkJoinPool forkJoinPool; - private final ThreadSafeUniquifier uniquifier; + private final ThreadSafeUniquifier uniquifier; private final Callback callback; /** The maximum number of keys to visit at once. */ private static final int VISIT_BATCH_SIZE = 10000; private AbstractSkyKeyBFSVisitor( - ForkJoinPool forkJoinPool, - ThreadSafeUniquifier uniquifier, - Callback callback) { + ForkJoinPool forkJoinPool, ThreadSafeUniquifier uniquifier, Callback callback) { this.forkJoinPool = forkJoinPool; this.uniquifier = uniquifier; this.callback = callback; @@ -252,14 +318,14 @@ class ParallelSkyQueryUtils { /** Factory for {@link AbstractSkyKeyBFSVisitor} instances. */ private static interface Factory { - AbstractSkyKeyBFSVisitor create(); + AbstractSkyKeyBFSVisitor create(); } - protected static final class Visit { + protected final class Visit { private final Iterable keysToUseForResult; - private final Iterable keysToVisit; + private final Iterable keysToVisit; - private Visit(Iterable keysToUseForResult, Iterable keysToVisit) { + private Visit(Iterable keysToUseForResult, Iterable keysToVisit) { this.keysToUseForResult = keysToUseForResult; this.keysToVisit = keysToVisit; } @@ -267,9 +333,11 @@ class ParallelSkyQueryUtils { void visitAndWaitForCompletion(Iterable keys) throws QueryException, InterruptedException { - Iterable> tasks = getTasks(new Visit( - /*keysToUseForResult=*/ ImmutableList.of(), - /*keysToVisit=*/ keys)); + Iterable> tasks = + getTasks( + new Visit( + /*keysToUseForResult=*/ ImmutableList.of(), + /*keysToVisit=*/ preprocessInitialVisit(keys))); for (ForkJoinTask task : tasks) { forkJoinPool.execute(task); } @@ -303,15 +371,15 @@ class ParallelSkyQueryUtils { } private class VisitTask extends AbstractInternalRecursiveAction { - private final Iterable keysToVisit; + private final Iterable keysToVisit; - private VisitTask(Iterable keysToVisit) { + private VisitTask(Iterable keysToVisit) { this.keysToVisit = keysToVisit; } @Override protected void computeImpl() throws InterruptedException { - ImmutableList uniqueKeys = uniquifier.unique(keysToVisit); + ImmutableList uniqueKeys = uniquifier.unique(keysToVisit); if (uniqueKeys.isEmpty()) { return; } @@ -343,8 +411,8 @@ class ParallelSkyQueryUtils { // getting and outputting results, each of which obeys the separate batch limits. // TODO(bazel-team): Attempt to group work on targets within the same package. ImmutableList.Builder> tasksBuilder = ImmutableList.builder(); - for (Iterable keysToVisitBatch - : Iterables.partition(visit.keysToVisit, VISIT_BATCH_SIZE)) { + for (Iterable keysToVisitBatch : + Iterables.partition(visit.keysToVisit, VISIT_BATCH_SIZE)) { tasksBuilder.add(new VisitTask(keysToVisitBatch)); } for (Iterable keysToUseForResultBatch : Iterables.partition( @@ -362,7 +430,10 @@ class ParallelSkyQueryUtils { Iterable keysToUseForResult) throws InterruptedException; /** Gets the {@link Visit} representing the local visitation of the given {@code values}. */ - protected abstract Visit getVisitResult(Iterable values) throws InterruptedException; + protected abstract Visit getVisitResult(Iterable values) throws InterruptedException; + + /** Gets the first {@link Visit} representing the entry-level SkyKeys. */ + protected abstract Iterable preprocessInitialVisit(Iterable keys); } private static class RuntimeQueryException extends RuntimeException { diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index 825adafdaf..1f2bcd709d 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -78,6 +78,7 @@ import com.google.devtools.build.lib.skyframe.SkyFunctions; import com.google.devtools.build.lib.skyframe.TargetPatternValue; import com.google.devtools.build.lib.skyframe.TargetPatternValue.TargetPatternKey; import com.google.devtools.build.lib.skyframe.TransitiveTraversalValue; +import com.google.devtools.build.lib.util.Pair; import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.lib.vfs.RootedPath; @@ -121,13 +122,6 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment protected static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors(); private static final int MAX_QUERY_EXPRESSION_LOG_CHARS = 1000; private static final Logger LOG = Logger.getLogger(SkyQueryEnvironment.class.getName()); - private static final Function TARGET_LABEL_FUNCTION = - new Function() { - @Override - public Label apply(Target target) { - return target.getLabel(); - } - }; private final BlazeTargetAccessor accessor = new BlazeTargetAccessor(this); private final int loadingPhaseThreads; @@ -345,14 +339,14 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment return super.evaluateQuery(expr, batchCallback); } - private Map> targetifyValues(Map> input) - throws InterruptedException { + private Map> targetifyValues( + Map> input) throws InterruptedException { ImmutableMap.Builder> result = ImmutableMap.builder(); Map allTargets = makeTargetsFromSkyKeys(Sets.newHashSet(Iterables.concat(input.values()))); - for (Map.Entry> entry : input.entrySet()) { + for (Map.Entry> entry : input.entrySet()) { Iterable skyKeys = entry.getValue(); Set targets = CompactHashSet.createWithExpectedSize(Iterables.size(skyKeys)); for (SkyKey key : skyKeys) { @@ -449,6 +443,12 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment return processRawReverseDeps(rawReverseDeps); } + /** Targetify SkyKeys of reverse deps and filter out targets whose deps are not allowed. */ + Collection filterRawReverseDepsOfTransitiveTraversalKeys( + Map> rawReverseDeps) throws InterruptedException { + return processRawReverseDeps(targetifyValues(rawReverseDeps)); + } + private Collection processRawReverseDeps(Map> rawReverseDeps) throws InterruptedException { Set result = CompactHashSet.create(); @@ -551,6 +551,11 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment return new ThreadSafeSkyKeyUniquifier(DEFAULT_THREAD_COUNT); } + @ThreadSafe + ThreadSafeUniquifier> createReverseDepSkyKeyUniquifier() { + return new ThreadSafeReverseDepSkyKeyUniquifier(DEFAULT_THREAD_COUNT); + } + @ThreadSafe @Override public void getTargetsMatchingPattern( @@ -732,18 +737,19 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment // so no preloading of target patterns is necessary. } - private static final Function SKYKEY_TO_LABEL = new Function() { - @Nullable - @Override - public Label apply(SkyKey skyKey) { - SkyFunctionName functionName = skyKey.functionName(); - if (!functionName.equals(SkyFunctions.TRANSITIVE_TRAVERSAL)) { - // Skip non-targets. - return null; - } - return (Label) skyKey.argument(); - } - }; + static final Function SKYKEY_TO_LABEL = + new Function() { + @Nullable + @Override + public Label apply(SkyKey skyKey) { + SkyFunctionName functionName = skyKey.functionName(); + if (!functionName.equals(SkyFunctions.TRANSITIVE_TRAVERSAL)) { + // Skip non-targets. + return null; + } + return (Label) skyKey.argument(); + } + }; @ThreadSafe public Map makeTargetsFromSkyKeys(Iterable keys) @@ -1011,6 +1017,22 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment } } + /** + * A uniquifer which takes a pair of parent and reverse dep, and uniquify based on the second + * element (reverse dep). + */ + private static class ThreadSafeReverseDepSkyKeyUniquifier + extends AbstractThreadSafeUniquifier, SkyKey> { + protected ThreadSafeReverseDepSkyKeyUniquifier(int concurrencyLevel) { + super(concurrencyLevel); + } + + @Override + protected SkyKey extractKey(Pair element) { + return element.second; + } + } + /** * Wraps a {@link Callback} and guarantees that all calls to the original will have at least * {@code batchThreshold} {@link Target}s, except for the final such call. @@ -1021,12 +1043,12 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment * #processLastPending} must be called to "flush" any remaining {@link Target}s through to the * original. * - *

This callback may be called from multiple threads concurrently. At most one thread will - * call the wrapped {@code callback} concurrently. + *

This callback may be called from multiple threads concurrently. At most one thread will call + * the wrapped {@code callback} concurrently. */ @ThreadSafe - private static class BatchStreamedCallback - extends OutputFormatterCallback implements ThreadSafeCallback { + private static class BatchStreamedCallback extends OutputFormatterCallback + implements ThreadSafeCallback { private final OutputFormatterCallback callback; private final ThreadSafeUniquifier uniquifier = -- cgit v1.2.3