diff options
author | Googler <noreply@google.com> | 2016-06-27 15:43:11 +0000 |
---|---|---|
committer | Dmitry Lomov <dslomov@google.com> | 2016-06-27 17:06:57 +0000 |
commit | d19116235914e058ace1023103c7c92252207694 (patch) | |
tree | 5955f65998ee779470d7d5a0bbb4ddb00befcf1d /src/main | |
parent | 48c98c9837d6b3768862bc66b53cae4708a147b7 (diff) |
Add SkyQuery-specific allrdeps implementation to allow batch getReverseDeps
with maximum limit
--
MOS_MIGRATED_REVID=125959807
Diffstat (limited to 'src/main')
3 files changed, 249 insertions, 51 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index 30a7aa65a1..3a8f9baa67 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -13,6 +13,7 @@ // limitations under the License. package com.google.devtools.build.lib.query2; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.base.Predicates; @@ -25,6 +26,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet.Builder; import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListeningExecutorService; @@ -59,6 +61,7 @@ import com.google.devtools.build.lib.query2.engine.QueryException; import com.google.devtools.build.lib.query2.engine.QueryExpression; import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper; import com.google.devtools.build.lib.query2.engine.RdepsFunction; +import com.google.devtools.build.lib.query2.engine.StreamableQueryEnvironment; import com.google.devtools.build.lib.query2.engine.TargetLiteral; import com.google.devtools.build.lib.query2.engine.Uniquifier; import com.google.devtools.build.lib.skyframe.BlacklistedPackagePrefixesValue; @@ -91,9 +94,11 @@ import java.util.Deque; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; @@ -104,12 +109,12 @@ import javax.annotation.Nullable; /** * {@link AbstractBlazeQueryEnvironment} that introspects the Skyframe graph to find forward and - * reverse edges. Results obtained by calling {@link #evaluateQuery} are not guaranteed to be in - * any particular order. As well, this class eagerly loads the full transitive closure of targets, - * even if the full closure isn't needed. + * reverse edges. Results obtained by calling {@link #evaluateQuery} are not guaranteed to be in any + * particular order. As well, this class eagerly loads the full transitive closure of targets, even + * if the full closure isn't needed. */ -public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> { - +public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> + implements StreamableQueryEnvironment<Target> { // 10k is likely a good balance between using batch efficiently and not blowing up memory. // TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant. private static final int BATCH_CALLBACK_SIZE = 10000; @@ -129,7 +134,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> { private static final Function<Target, Label> TARGET_LABEL_FUNCTION = new Function<Target, Label>() { - + @Override public Label apply(Target target) { return target.getLabel(); @@ -337,7 +342,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> { private Map<Target, Collection<Target>> makeTargetsMap(Map<SkyKey, Iterable<SkyKey>> input) { ImmutableMap.Builder<Target, Collection<Target>> result = ImmutableMap.builder(); - + Map<SkyKey, Target> allTargets = makeTargetsFromSkyKeys(Sets.newHashSet(Iterables.concat(input.values()))); @@ -407,10 +412,14 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> { @Override public Collection<Target> getReverseDeps(Iterable<Target> targets) { - Set<Target> result = CompactHashSet.create(); Map<Target, Collection<Target>> rawReverseDeps = getRawReverseDeps(targets); warnIfMissingTargets(targets, rawReverseDeps.keySet()); + return processRawReverseDeps(rawReverseDeps); + } + + private Collection<Target> processRawReverseDeps(Map<Target, Collection<Target>> rawReverseDeps) { + Set<Target> result = CompactHashSet.create(); CompactHashSet<Target> visited = CompactHashSet.create(); Set<Label> keys = CompactHashSet.create(Collections2.transform(rawReverseDeps.keySet(), @@ -730,16 +739,16 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> { } /** - * Returns FileValue keys for which there may be relevant (from the perspective of - * {@link #getRBuildFiles}) FileValues in the graph corresponding to the given - * {@code pathFragments}, which are assumed to be file paths. + * Returns FileValue keys for which there may be relevant (from the perspective of {@link + * #getRBuildFiles}) FileValues in the graph corresponding to the given {@code pathFragments}, + * which are assumed to be file paths. * * <p>To do this, we emulate the {@link ContainingPackageLookupFunction} logic: for each given * file path, we look for the nearest ancestor directory (starting with its parent directory), if * any, that has a package. The {@link PackageLookupValue} for this package tells us the package * root that we should use for the {@link RootedPath} for the {@link FileValue} key. - * - * Note that there may not be nodes in the graph corresponding to the returned SkyKeys. + * + * <p>Note that there may not be nodes in the graph corresponding to the returned SkyKeys. */ private Collection<SkyKey> getSkyKeysForFileFragments(Iterable<PathFragment> pathFragments) { Set<SkyKey> result = new HashSet<>(); @@ -933,4 +942,157 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> { } } } + + @Override + public void getAllRdeps( + QueryExpression expression, + Predicate<Target> universe, + final Callback<Target> callback, + final int depth) + throws QueryException, InterruptedException { + getAllRdeps(expression, universe, callback, depth, BATCH_CALLBACK_SIZE); + } + + /** + * Computes and applies the callback to the reverse dependencies of the expression. + * + * <p>Batch size is used to only populate at most N targets at one time, because some individual + * nodes are directly depended on by a large number of other nodes. + */ + @VisibleForTesting + protected void getAllRdeps( + QueryExpression expression, + final Predicate<Target> universe, + final Callback<Target> callback, + final int depth, + final int batchSize) + throws QueryException, InterruptedException { + Uniquifier<Target> uniquifier = createUniquifier(); + eval(expression, new BatchAllRdepsCallback(uniquifier, universe, callback, depth, batchSize)); + } + + private class BatchAllRdepsCallback implements Callback<Target> { + private final Uniquifier<Target> uniquifier; + private final Predicate<Target> universe; + private final Callback<Target> callback; + private final int depth; + private final int batchSize; + + private BatchAllRdepsCallback( + Uniquifier<Target> uniquifier, + Predicate<Target> universe, + Callback<Target> callback, + int depth, + int batchSize) { + this.uniquifier = uniquifier; + this.universe = universe; + this.callback = callback; + this.depth = depth; + this.batchSize = batchSize; + } + + @Override + public void process(Iterable<Target> targets) throws QueryException, InterruptedException { + Iterable<Target> currentInUniverse = Iterables.filter(targets, universe); + ImmutableList<Target> uniqueTargets = uniquifier.unique(currentInUniverse); + callback.process(uniqueTargets); + + // Maintain a queue to allow tracking rdep relationships in BFS order. Rdeps are stored + // as 1:N SkyKey mappings instead of fully populated Targets to save memory. Targets + // have a reference to their entire Package, which is really memory expensive. + Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue = new LinkedList<>(); + reverseDepsQueue.addAll( + graph.getReverseDeps(makeTransitiveTraversalKeys(uniqueTargets)).entrySet()); + + // In each iteration, we populate a size-limited (no more than batchSize) number of + // SkyKey mappings to targets, and append the SkyKey rdeps mappings to the queue. Once + // processed by callback, the targets are dequeued and not referenced any more, making + // them available for garbage collection. + + for (int i = 0; i < depth; i++) { + // The mappings between nodes and their reverse deps must be preserved instead of the + // reverse deps alone. Later when deserializing dependent nodes using SkyKeys, we need to + // check if their allowed deps contain the dependencies. + Map<SkyKey, Iterable<SkyKey>> reverseDepsMap = Maps.newHashMap(); + int batch = 0; // Tracking the current total number of rdeps in reverseDepsMap. + int processed = 0; + // Save current size as when we are process nodes in the current level, new mappings (for + // the next level) are added to the queue. + int size = reverseDepsQueue.size(); + while (processed < size) { + // We always peek the first element in the queue without polling it, to determine if + // adding it to the pending list will break the limit of max size. If yes then we process + // and empty the pending list first, and poll the element in the next iteration. + Map.Entry<SkyKey, Iterable<SkyKey>> entry = reverseDepsQueue.peek(); + + // The value of the entry is either a CompactHashSet or ImmutableList, which can return + // the size in O(1) time. + int rdepsSize = Iterables.size(entry.getValue()); + if (rdepsSize == 0) { + reverseDepsQueue.poll(); + processed++; + continue; + } + + if ((rdepsSize + batch <= batchSize)) { + // If current size is less than batch size, dequeue the node, update the current + // batch size and map. + reverseDepsMap.put(entry.getKey(), entry.getValue()); + batch += rdepsSize; + reverseDepsQueue.poll(); + processed++; + } else { + if (batch == 0) { + // The (single) node has more rdeps than the limit, divide them up to process + // separately. + for (Iterable<SkyKey> subList : Iterables.partition(entry.getValue(), batchSize)) { + reverseDepsMap.put(entry.getKey(), subList); + processReverseDepsMap(uniquifier, reverseDepsMap, callback, reverseDepsQueue); + } + + reverseDepsQueue.poll(); + processed++; + } else { + // There are some nodes in the pending process list. Process them first and come + // back to this node later (in next iteration). + processReverseDepsMap(uniquifier, reverseDepsMap, callback, reverseDepsQueue); + batch = 0; + } + } + } + + if (!reverseDepsMap.isEmpty()) { + processReverseDepsMap(uniquifier, reverseDepsMap, callback, reverseDepsQueue); + } + + // If the queue is empty after all nodes in the current level are processed, stop + // processing as there are no more reverse deps. + if (reverseDepsQueue.isEmpty()) { + break; + } + } + } + + /** + * Populates {@link Target}s from reverse dep mappings of {@link SkyKey}s, empties the pending + * list and add next level reverse dep mappings of {@link SkyKey}s to the queue. + */ + private void processReverseDepsMap( + Uniquifier<Target> uniquifier, + Map<SkyKey, Iterable<SkyKey>> reverseDepsMap, + Callback<Target> callback, + Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue) + throws QueryException, InterruptedException { + Collection<Target> children = processRawReverseDeps(makeTargetsMap(reverseDepsMap)); + Iterable<Target> currentInUniverse = Iterables.filter(children, universe); + ImmutableList<Target> uniqueChildren = uniquifier.unique(currentInUniverse); + reverseDepsMap.clear(); + + if (!uniqueChildren.isEmpty()) { + callback.process(uniqueChildren); + reverseDepsQueue.addAll( + graph.getReverseDeps(makeTransitiveTraversalKeys(uniqueChildren)).entrySet()); + } + } + } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java index ed49ef4f93..1fe882fae9 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java @@ -34,7 +34,6 @@ import java.util.List; */ // Public because SkyQueryEnvironment needs to refer to it directly. public class AllRdepsFunction implements QueryFunction { - public AllRdepsFunction() {} @Override public String getName() { @@ -43,7 +42,7 @@ public class AllRdepsFunction implements QueryFunction { @Override public int getMandatoryArguments() { - return 1; // last argument is optional + return 1; // last argument is optional } @Override @@ -51,45 +50,50 @@ public class AllRdepsFunction implements QueryFunction { return ImmutableList.of(ArgumentType.EXPRESSION, ArgumentType.INTEGER); } - /** - * Breadth-first search from the argument while sticking to nodes satisfying the {@code universe} - * predicate. - */ - protected static <T> void eval(final QueryEnvironment<T> env, final List<Argument> args, - final Callback<T> callback, final Predicate<T> universe) - throws QueryException, InterruptedException { - final Uniquifier<T> uniquifier = env.createUniquifier(); - final int depthBound = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE; - env.eval(args.get(0).getExpression(), new Callback<T>() { - @Override - public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { - Iterable<T> current = partialResult; - // We need to iterate depthBound + 1 times. - for (int i = 0; i <= depthBound; i++) { - List<T> next = new ArrayList<>(); - // Restrict to nodes satisfying the universe predicate. - Iterable<T> currentInUniverse = Iterables.filter(current, universe); - // Filter already visited nodes: if we see a node in a later round, then we don't need to - // visit it again, because the depth at which we see it must be greater than or equal to - // the last visit. - next.addAll(env.getReverseDeps(uniquifier.unique(currentInUniverse))); - callback.process(currentInUniverse); - if (next.isEmpty()) { - // Exit when there are no more nodes to visit. - break; - } - current = next; - } - - } - }); - - } - - /** Breadth-first search from the argument. */ @Override public <T> void eval(QueryEnvironment<T> env, QueryExpression expression, List<Argument> args, Callback<T> callback) throws QueryException, InterruptedException { eval(env, args, callback, Predicates.<T>alwaysTrue()); } + + protected <T> void eval( + final QueryEnvironment<T> env, + final List<Argument> args, + final Callback<T> callback, + final Predicate<T> universe) + throws QueryException, InterruptedException { + + final int depth = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE; + if (env instanceof StreamableQueryEnvironment<?>) { + ((StreamableQueryEnvironment<T>) env) + .getAllRdeps(args.get(0).getExpression(), universe, callback, depth); + } else { + final Uniquifier<T> uniquifier = env.createUniquifier(); + env.eval( + args.get(0).getExpression(), + new Callback<T>() { + @Override + public void process(Iterable<T> partialResult) + throws QueryException, InterruptedException { + Iterable<T> current = partialResult; + // We need to iterate depthBound + 1 times. + for (int i = 0; i <= depth; i++) { + List<T> next = new ArrayList<>(); + // Restrict to nodes satisfying the universe predicate. + Iterable<T> currentInUniverse = Iterables.filter(current, universe); + // Filter already visited nodes: if we see a node in a later round, then we don't + // need to visit it again, because the depth at which we see it must be greater + // than or equal to the last visit. + next.addAll(env.getReverseDeps(uniquifier.unique(currentInUniverse))); + callback.process(currentInUniverse); + if (next.isEmpty()) { + // Exit when there are no more nodes to visit. + break; + } + current = next; + } + } + }); + } + } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java new file mode 100644 index 0000000000..0f0a593267 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java @@ -0,0 +1,32 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.query2.engine; + +import com.google.common.base.Predicate; + +/** + * The environment of a Blaze query which supports predefined streaming operations. + * + * @param <T> the node type of the dependency graph + */ +public interface StreamableQueryEnvironment<T> extends QueryEnvironment<T> { + + /** Retrieve and process all reverse dependencies of given expression in a streaming manner. */ + void getAllRdeps( + QueryExpression expression, + Predicate<T> universe, + final Callback<T> callback, + final int depth) + throws QueryException, InterruptedException; +} |