aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google
diff options
context:
space:
mode:
authorGravatar Googler <noreply@google.com>2016-06-27 15:43:11 +0000
committerGravatar Dmitry Lomov <dslomov@google.com>2016-06-27 17:06:57 +0000
commitd19116235914e058ace1023103c7c92252207694 (patch)
tree5955f65998ee779470d7d5a0bbb4ddb00befcf1d /src/main/java/com/google
parent48c98c9837d6b3768862bc66b53cae4708a147b7 (diff)
Add SkyQuery-specific allrdeps implementation to allow batch getReverseDeps
with maximum limit -- MOS_MIGRATED_REVID=125959807
Diffstat (limited to 'src/main/java/com/google')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java188
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java80
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java32
3 files changed, 249 insertions, 51 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 30a7aa65a1..3a8f9baa67 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.query2;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
@@ -25,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSet.Builder;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -59,6 +61,7 @@ import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper;
import com.google.devtools.build.lib.query2.engine.RdepsFunction;
+import com.google.devtools.build.lib.query2.engine.StreamableQueryEnvironment;
import com.google.devtools.build.lib.query2.engine.TargetLiteral;
import com.google.devtools.build.lib.query2.engine.Uniquifier;
import com.google.devtools.build.lib.skyframe.BlacklistedPackagePrefixesValue;
@@ -91,9 +94,11 @@ import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -104,12 +109,12 @@ import javax.annotation.Nullable;
/**
* {@link AbstractBlazeQueryEnvironment} that introspects the Skyframe graph to find forward and
- * reverse edges. Results obtained by calling {@link #evaluateQuery} are not guaranteed to be in
- * any particular order. As well, this class eagerly loads the full transitive closure of targets,
- * even if the full closure isn't needed.
+ * reverse edges. Results obtained by calling {@link #evaluateQuery} are not guaranteed to be in any
+ * particular order. As well, this class eagerly loads the full transitive closure of targets, even
+ * if the full closure isn't needed.
*/
-public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
-
+public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
+ implements StreamableQueryEnvironment<Target> {
// 10k is likely a good balance between using batch efficiently and not blowing up memory.
// TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant.
private static final int BATCH_CALLBACK_SIZE = 10000;
@@ -129,7 +134,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
private static final Function<Target, Label> TARGET_LABEL_FUNCTION =
new Function<Target, Label>() {
-
+
@Override
public Label apply(Target target) {
return target.getLabel();
@@ -337,7 +342,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
private Map<Target, Collection<Target>> makeTargetsMap(Map<SkyKey, Iterable<SkyKey>> input) {
ImmutableMap.Builder<Target, Collection<Target>> result = ImmutableMap.builder();
-
+
Map<SkyKey, Target> allTargets =
makeTargetsFromSkyKeys(Sets.newHashSet(Iterables.concat(input.values())));
@@ -407,10 +412,14 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
@Override
public Collection<Target> getReverseDeps(Iterable<Target> targets) {
- Set<Target> result = CompactHashSet.create();
Map<Target, Collection<Target>> rawReverseDeps = getRawReverseDeps(targets);
warnIfMissingTargets(targets, rawReverseDeps.keySet());
+ return processRawReverseDeps(rawReverseDeps);
+ }
+
+ private Collection<Target> processRawReverseDeps(Map<Target, Collection<Target>> rawReverseDeps) {
+ Set<Target> result = CompactHashSet.create();
CompactHashSet<Target> visited = CompactHashSet.create();
Set<Label> keys = CompactHashSet.create(Collections2.transform(rawReverseDeps.keySet(),
@@ -730,16 +739,16 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
}
/**
- * Returns FileValue keys for which there may be relevant (from the perspective of
- * {@link #getRBuildFiles}) FileValues in the graph corresponding to the given
- * {@code pathFragments}, which are assumed to be file paths.
+ * Returns FileValue keys for which there may be relevant (from the perspective of {@link
+ * #getRBuildFiles}) FileValues in the graph corresponding to the given {@code pathFragments},
+ * which are assumed to be file paths.
*
* <p>To do this, we emulate the {@link ContainingPackageLookupFunction} logic: for each given
* file path, we look for the nearest ancestor directory (starting with its parent directory), if
* any, that has a package. The {@link PackageLookupValue} for this package tells us the package
* root that we should use for the {@link RootedPath} for the {@link FileValue} key.
- *
- * Note that there may not be nodes in the graph corresponding to the returned SkyKeys.
+ *
+ * <p>Note that there may not be nodes in the graph corresponding to the returned SkyKeys.
*/
private Collection<SkyKey> getSkyKeysForFileFragments(Iterable<PathFragment> pathFragments) {
Set<SkyKey> result = new HashSet<>();
@@ -933,4 +942,157 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
}
}
}
+
+ @Override
+ public void getAllRdeps(
+ QueryExpression expression,
+ Predicate<Target> universe,
+ final Callback<Target> callback,
+ final int depth)
+ throws QueryException, InterruptedException {
+ getAllRdeps(expression, universe, callback, depth, BATCH_CALLBACK_SIZE);
+ }
+
+ /**
+ * Computes and applies the callback to the reverse dependencies of the expression.
+ *
+ * <p>Batch size is used to only populate at most N targets at one time, because some individual
+ * nodes are directly depended on by a large number of other nodes.
+ */
+ @VisibleForTesting
+ protected void getAllRdeps(
+ QueryExpression expression,
+ final Predicate<Target> universe,
+ final Callback<Target> callback,
+ final int depth,
+ final int batchSize)
+ throws QueryException, InterruptedException {
+ Uniquifier<Target> uniquifier = createUniquifier();
+ eval(expression, new BatchAllRdepsCallback(uniquifier, universe, callback, depth, batchSize));
+ }
+
+ private class BatchAllRdepsCallback implements Callback<Target> {
+ private final Uniquifier<Target> uniquifier;
+ private final Predicate<Target> universe;
+ private final Callback<Target> callback;
+ private final int depth;
+ private final int batchSize;
+
+ private BatchAllRdepsCallback(
+ Uniquifier<Target> uniquifier,
+ Predicate<Target> universe,
+ Callback<Target> callback,
+ int depth,
+ int batchSize) {
+ this.uniquifier = uniquifier;
+ this.universe = universe;
+ this.callback = callback;
+ this.depth = depth;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public void process(Iterable<Target> targets) throws QueryException, InterruptedException {
+ Iterable<Target> currentInUniverse = Iterables.filter(targets, universe);
+ ImmutableList<Target> uniqueTargets = uniquifier.unique(currentInUniverse);
+ callback.process(uniqueTargets);
+
+ // Maintain a queue to allow tracking rdep relationships in BFS order. Rdeps are stored
+ // as 1:N SkyKey mappings instead of fully populated Targets to save memory. Targets
+ // have a reference to their entire Package, which is really memory expensive.
+ Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue = new LinkedList<>();
+ reverseDepsQueue.addAll(
+ graph.getReverseDeps(makeTransitiveTraversalKeys(uniqueTargets)).entrySet());
+
+ // In each iteration, we populate a size-limited (no more than batchSize) number of
+ // SkyKey mappings to targets, and append the SkyKey rdeps mappings to the queue. Once
+ // processed by callback, the targets are dequeued and not referenced any more, making
+ // them available for garbage collection.
+
+ for (int i = 0; i < depth; i++) {
+ // The mappings between nodes and their reverse deps must be preserved instead of the
+ // reverse deps alone. Later when deserializing dependent nodes using SkyKeys, we need to
+ // check if their allowed deps contain the dependencies.
+ Map<SkyKey, Iterable<SkyKey>> reverseDepsMap = Maps.newHashMap();
+ int batch = 0; // Tracking the current total number of rdeps in reverseDepsMap.
+ int processed = 0;
+ // Save current size as when we are process nodes in the current level, new mappings (for
+ // the next level) are added to the queue.
+ int size = reverseDepsQueue.size();
+ while (processed < size) {
+ // We always peek the first element in the queue without polling it, to determine if
+ // adding it to the pending list will break the limit of max size. If yes then we process
+ // and empty the pending list first, and poll the element in the next iteration.
+ Map.Entry<SkyKey, Iterable<SkyKey>> entry = reverseDepsQueue.peek();
+
+ // The value of the entry is either a CompactHashSet or ImmutableList, which can return
+ // the size in O(1) time.
+ int rdepsSize = Iterables.size(entry.getValue());
+ if (rdepsSize == 0) {
+ reverseDepsQueue.poll();
+ processed++;
+ continue;
+ }
+
+ if ((rdepsSize + batch <= batchSize)) {
+ // If current size is less than batch size, dequeue the node, update the current
+ // batch size and map.
+ reverseDepsMap.put(entry.getKey(), entry.getValue());
+ batch += rdepsSize;
+ reverseDepsQueue.poll();
+ processed++;
+ } else {
+ if (batch == 0) {
+ // The (single) node has more rdeps than the limit, divide them up to process
+ // separately.
+ for (Iterable<SkyKey> subList : Iterables.partition(entry.getValue(), batchSize)) {
+ reverseDepsMap.put(entry.getKey(), subList);
+ processReverseDepsMap(uniquifier, reverseDepsMap, callback, reverseDepsQueue);
+ }
+
+ reverseDepsQueue.poll();
+ processed++;
+ } else {
+ // There are some nodes in the pending process list. Process them first and come
+ // back to this node later (in next iteration).
+ processReverseDepsMap(uniquifier, reverseDepsMap, callback, reverseDepsQueue);
+ batch = 0;
+ }
+ }
+ }
+
+ if (!reverseDepsMap.isEmpty()) {
+ processReverseDepsMap(uniquifier, reverseDepsMap, callback, reverseDepsQueue);
+ }
+
+ // If the queue is empty after all nodes in the current level are processed, stop
+ // processing as there are no more reverse deps.
+ if (reverseDepsQueue.isEmpty()) {
+ break;
+ }
+ }
+ }
+
+ /**
+ * Populates {@link Target}s from reverse dep mappings of {@link SkyKey}s, empties the pending
+ * list and add next level reverse dep mappings of {@link SkyKey}s to the queue.
+ */
+ private void processReverseDepsMap(
+ Uniquifier<Target> uniquifier,
+ Map<SkyKey, Iterable<SkyKey>> reverseDepsMap,
+ Callback<Target> callback,
+ Queue<Map.Entry<SkyKey, Iterable<SkyKey>>> reverseDepsQueue)
+ throws QueryException, InterruptedException {
+ Collection<Target> children = processRawReverseDeps(makeTargetsMap(reverseDepsMap));
+ Iterable<Target> currentInUniverse = Iterables.filter(children, universe);
+ ImmutableList<Target> uniqueChildren = uniquifier.unique(currentInUniverse);
+ reverseDepsMap.clear();
+
+ if (!uniqueChildren.isEmpty()) {
+ callback.process(uniqueChildren);
+ reverseDepsQueue.addAll(
+ graph.getReverseDeps(makeTransitiveTraversalKeys(uniqueChildren)).entrySet());
+ }
+ }
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
index ed49ef4f93..1fe882fae9 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
@@ -34,7 +34,6 @@ import java.util.List;
*/
// Public because SkyQueryEnvironment needs to refer to it directly.
public class AllRdepsFunction implements QueryFunction {
- public AllRdepsFunction() {}
@Override
public String getName() {
@@ -43,7 +42,7 @@ public class AllRdepsFunction implements QueryFunction {
@Override
public int getMandatoryArguments() {
- return 1; // last argument is optional
+ return 1; // last argument is optional
}
@Override
@@ -51,45 +50,50 @@ public class AllRdepsFunction implements QueryFunction {
return ImmutableList.of(ArgumentType.EXPRESSION, ArgumentType.INTEGER);
}
- /**
- * Breadth-first search from the argument while sticking to nodes satisfying the {@code universe}
- * predicate.
- */
- protected static <T> void eval(final QueryEnvironment<T> env, final List<Argument> args,
- final Callback<T> callback, final Predicate<T> universe)
- throws QueryException, InterruptedException {
- final Uniquifier<T> uniquifier = env.createUniquifier();
- final int depthBound = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE;
- env.eval(args.get(0).getExpression(), new Callback<T>() {
- @Override
- public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
- Iterable<T> current = partialResult;
- // We need to iterate depthBound + 1 times.
- for (int i = 0; i <= depthBound; i++) {
- List<T> next = new ArrayList<>();
- // Restrict to nodes satisfying the universe predicate.
- Iterable<T> currentInUniverse = Iterables.filter(current, universe);
- // Filter already visited nodes: if we see a node in a later round, then we don't need to
- // visit it again, because the depth at which we see it must be greater than or equal to
- // the last visit.
- next.addAll(env.getReverseDeps(uniquifier.unique(currentInUniverse)));
- callback.process(currentInUniverse);
- if (next.isEmpty()) {
- // Exit when there are no more nodes to visit.
- break;
- }
- current = next;
- }
-
- }
- });
-
- }
-
- /** Breadth-first search from the argument. */
@Override
public <T> void eval(QueryEnvironment<T> env, QueryExpression expression, List<Argument> args,
Callback<T> callback) throws QueryException, InterruptedException {
eval(env, args, callback, Predicates.<T>alwaysTrue());
}
+
+ protected <T> void eval(
+ final QueryEnvironment<T> env,
+ final List<Argument> args,
+ final Callback<T> callback,
+ final Predicate<T> universe)
+ throws QueryException, InterruptedException {
+
+ final int depth = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE;
+ if (env instanceof StreamableQueryEnvironment<?>) {
+ ((StreamableQueryEnvironment<T>) env)
+ .getAllRdeps(args.get(0).getExpression(), universe, callback, depth);
+ } else {
+ final Uniquifier<T> uniquifier = env.createUniquifier();
+ env.eval(
+ args.get(0).getExpression(),
+ new Callback<T>() {
+ @Override
+ public void process(Iterable<T> partialResult)
+ throws QueryException, InterruptedException {
+ Iterable<T> current = partialResult;
+ // We need to iterate depthBound + 1 times.
+ for (int i = 0; i <= depth; i++) {
+ List<T> next = new ArrayList<>();
+ // Restrict to nodes satisfying the universe predicate.
+ Iterable<T> currentInUniverse = Iterables.filter(current, universe);
+ // Filter already visited nodes: if we see a node in a later round, then we don't
+ // need to visit it again, because the depth at which we see it must be greater
+ // than or equal to the last visit.
+ next.addAll(env.getReverseDeps(uniquifier.unique(currentInUniverse)));
+ callback.process(currentInUniverse);
+ if (next.isEmpty()) {
+ // Exit when there are no more nodes to visit.
+ break;
+ }
+ current = next;
+ }
+ }
+ });
+ }
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
new file mode 100644
index 0000000000..0f0a593267
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
@@ -0,0 +1,32 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.query2.engine;
+
+import com.google.common.base.Predicate;
+
+/**
+ * The environment of a Blaze query which supports predefined streaming operations.
+ *
+ * @param <T> the node type of the dependency graph
+ */
+public interface StreamableQueryEnvironment<T> extends QueryEnvironment<T> {
+
+ /** Retrieve and process all reverse dependencies of given expression in a streaming manner. */
+ void getAllRdeps(
+ QueryExpression expression,
+ Predicate<T> universe,
+ final Callback<T> callback,
+ final int depth)
+ throws QueryException, InterruptedException;
+}