aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java15
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java26
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java9
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java4
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java5
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java25
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java17
7 files changed, 70 insertions, 31 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index 87f8e7de51..ae499580d3 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -94,7 +94,7 @@ class ParallelSkyQueryUtils {
}
/** A helper class that computes 'rbuildfiles(<blah>)' via BFS. */
- private static class RBuildFilesVisitor extends ParallelVisitor<SkyKey> {
+ private static class RBuildFilesVisitor extends ParallelVisitor<SkyKey, Target> {
private final SkyQueryEnvironment env;
private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
@@ -130,9 +130,9 @@ class ParallelSkyQueryUtils {
}
@Override
- protected void processResultantTargets(
+ protected void processPartialResults(
Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
- throws QueryException, InterruptedException {
+ throws QueryException, InterruptedException {
Set<PackageIdentifier> pkgIdsNeededForResult =
Streams.stream(keysToUseForResult)
.map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)
@@ -164,7 +164,8 @@ class ParallelSkyQueryUtils {
* even with 10M edges, the memory overhead is around 160M, and the memory can be reclaimed by
* regular GC.
*/
- private static class AllRdepsUnboundedVisitor extends ParallelVisitor<Pair<SkyKey, SkyKey>> {
+ private static class AllRdepsUnboundedVisitor
+ extends ParallelVisitor<Pair<SkyKey, SkyKey>, Target> {
private final SkyQueryEnvironment env;
private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
@@ -201,7 +202,7 @@ class ParallelSkyQueryUtils {
}
@Override
- public ParallelVisitor<Pair<SkyKey, SkyKey>> create() {
+ public ParallelVisitor<Pair<SkyKey, SkyKey>, Target> create() {
return new AllRdepsUnboundedVisitor(env, uniquifier, callback, packageSemaphore);
}
}
@@ -270,9 +271,9 @@ class ParallelSkyQueryUtils {
}
@Override
- protected void processResultantTargets(
+ protected void processPartialResults(
Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
- throws QueryException, InterruptedException {
+ throws QueryException, InterruptedException {
Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
env.makePackageKeyToTargetKeyMap(keysToUseForResult);
Set<PackageIdentifier> pkgIdsNeededForResult =
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java
index 0bb9f1b8ab..e81aadd162 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java
@@ -40,11 +40,14 @@ import java.util.concurrent.TimeUnit;
*
* <p>The visitor uses an AbstractQueueVisitor backed by a ThreadPoolExecutor with a thread pool NOT
* part of the global query evaluation pool to avoid starvation.
+ *
+ * @param <T> the type of objects to visit
+ * @param <V> the type of visitation results to process
*/
@ThreadSafe
-public abstract class ParallelVisitor<T> {
+public abstract class ParallelVisitor<T, V> {
private final Uniquifier<T> uniquifier;
- private final Callback<Target> callback;
+ private final Callback<V> callback;
private final int visitBatchSize;
private final VisitingTaskExecutor executor;
@@ -80,7 +83,7 @@ public abstract class ParallelVisitor<T> {
*
* <p>TODO(shazh): Revisit the choice of task target based on real-prod performance.
*/
- private static final long MIN_PENDING_TASKS = 3 * SkyQueryEnvironment.DEFAULT_THREAD_COUNT;
+ private static final long MIN_PENDING_TASKS = 3L * SkyQueryEnvironment.DEFAULT_THREAD_COUNT;
/**
* Fail fast on RuntimeExceptions, including {@code RuntimeInterruptedException} and {@code
@@ -114,8 +117,7 @@ public abstract class ParallelVisitor<T> {
/*workQueue=*/ new BlockingStack<Runnable>(),
new ThreadFactoryBuilder().setNameFormat("parallel-visitor %d").build());
- protected ParallelVisitor(
- Uniquifier<T> uniquifier, Callback<Target> callback, int visitBatchSize) {
+ protected ParallelVisitor(Uniquifier<T> uniquifier, Callback<V> callback, int visitBatchSize) {
this.uniquifier = uniquifier;
this.callback = callback;
this.visitBatchSize = visitBatchSize;
@@ -125,7 +127,7 @@ public abstract class ParallelVisitor<T> {
/** Factory for {@link ParallelVisitor} instances. */
public interface Factory {
- ParallelVisitor<?> create();
+ ParallelVisitor<?, ?> create();
}
/**
@@ -154,11 +156,11 @@ public abstract class ParallelVisitor<T> {
}
/**
- * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s in
- * the full visitation to the given {@link Callback}.
+ * Forwards the given {@code keysToUseForResult}'s contribution to the set of results in the full
+ * visitation to the given {@link Callback}.
*/
- protected abstract void processResultantTargets(
- Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
+ protected abstract void processPartialResults(
+ Iterable<SkyKey> keysToUseForResult, Callback<V> callback)
throws QueryException, InterruptedException;
/** Gets the {@link Visit} representing the local visitation of the given {@code values}. */
@@ -227,7 +229,7 @@ public abstract class ParallelVisitor<T> {
@Override
protected void process() throws QueryException, InterruptedException {
- processResultantTargets(keysToUseForResult, callback);
+ processPartialResults(keysToUseForResult, callback);
}
}
@@ -312,7 +314,7 @@ public abstract class ParallelVisitor<T> {
@Override
public void process(Iterable<Target> partialResult)
throws QueryException, InterruptedException {
- ParallelVisitor<?> visitor = visitorFactory.create();
+ ParallelVisitor<?, ?> visitor = visitorFactory.create();
// TODO(nharmata): It's not ideal to have an operation like this in #process that blocks on
// another, potentially expensive computation. Refactor to something like "processAsync".
visitor.visitAndWaitForCompletion(
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 0fb0283594..3bd547ceb9 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -1247,6 +1247,15 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
this, expression, context, callback, packageSemaphore);
}
+ @Override
+ public QueryTaskFuture<Void> getRdepsUnboundedInUniverseParallel(
+ QueryExpression expression,
+ VariableContext<Target> context,
+ List<Argument> args,
+ Callback<Target> callback) {
+ return RdepsFunction.evalWithBoundedDepth(this, context, expression, args, callback);
+ }
+
@ThreadSafe
@Override
public QueryTaskFuture<Void> getAllRdeps(
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
index becfe2453c..5a3b91f575 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
@@ -58,7 +58,7 @@ public class AllRdepsFunction implements QueryFunction {
QueryExpression expression,
List<Argument> args,
Callback<T> callback) {
- return eval(env, context, args, callback, Optional.<Predicate<T>>absent());
+ return evalRdeps(env, context, args, callback, Optional.<Predicate<T>>absent());
}
/** Evaluates rdeps query. */
@@ -101,7 +101,7 @@ public class AllRdepsFunction implements QueryFunction {
});
}
- protected <T> QueryTaskFuture<Void> eval(
+ static <T> QueryTaskFuture<Void> evalRdeps(
final QueryEnvironment<T> env,
VariableContext<T> context,
final List<Argument> args,
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
index 73dd930ac3..7f57f796bf 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
@@ -105,7 +105,10 @@ public final class QueryUtil {
return new OrderedAggregateAllOutputFormatterCallbackImpl<>(env);
}
- /** Returns a fresh {@link AggregateAllCallback} instance. */
+ /**
+ * Returns a fresh {@link AggregateAllCallback} instance that aggregates all of the values into an
+ * {@link ThreadSafeMutableSet}.
+ */
public static <T> AggregateAllCallback<T, ThreadSafeMutableSet<T>> newAggregateAllCallback(
QueryEnvironment<T> env) {
return new AggregateAllOutputFormatterCallbackImpl<>(env);
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java
index ebd4d7d106..3971474edf 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java
@@ -51,10 +51,6 @@ public final class RdepsFunction extends AllRdepsFunction {
.add(ArgumentType.EXPRESSION).addAll(super.getArgumentTypes()).build();
}
- /**
- * Compute the transitive closure of the universe, then breadth-first search from the argument
- * towards the universe while staying within the transitive closure.
- */
@Override
public <T> QueryTaskFuture<Void> eval(
final QueryEnvironment<T> env,
@@ -62,6 +58,23 @@ public final class RdepsFunction extends AllRdepsFunction {
final QueryExpression expression,
final List<Argument> args,
final Callback<T> callback) {
+ boolean isDepthUnbounded = args.size() == 2;
+ return (isDepthUnbounded && env instanceof StreamableQueryEnvironment)
+ ? ((StreamableQueryEnvironment<T>) env)
+ .getRdepsUnboundedInUniverseParallel(expression, context, args, callback)
+ : evalWithBoundedDepth(env, context, expression, args, callback);
+ }
+
+ /**
+ * Compute the transitive closure of the universe, then breadth-first search from the argument
+ * towards the universe while staying within the transitive closure.
+ */
+ public static <T> QueryTaskFuture<Void> evalWithBoundedDepth(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ QueryExpression expression,
+ final List<Argument> args,
+ Callback<T> callback) {
QueryTaskFuture<ThreadSafeMutableSet<T>> universeValueFuture =
QueryUtil.evalAll(env, context, args.get(0).getExpression());
Function<ThreadSafeMutableSet<T>, QueryTaskFuture<Void>> evalInUniverseAsyncFunction =
@@ -75,9 +88,11 @@ public final class RdepsFunction extends AllRdepsFunction {
} catch (QueryException e) {
return env.immediateFailedFuture(e);
}
- return RdepsFunction.this.eval(
+
+ return AllRdepsFunction.evalRdeps(
env, context, args.subList(1, args.size()), callback, Optional.of(universe));
};
+
return env.transformAsync(universeValueFuture, evalInUniverseAsyncFunction);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
index bb67e93ad5..b055d6ad0a 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.query2.engine;
import com.google.common.base.Predicate;
+import java.util.List;
/**
* The environment of a Blaze query which supports predefined streaming operations.
@@ -22,7 +23,7 @@ import com.google.common.base.Predicate;
*/
public interface StreamableQueryEnvironment<T> extends QueryEnvironment<T> {
- /** Retrieve and process all reverse dependencies of given expression in a streaming manner. */
+ /** Retrieves and processes all reverse dependencies of given expression in a streaming manner. */
QueryTaskFuture<Void> getAllRdeps(
QueryExpression expression,
Predicate<T> universe,
@@ -30,12 +31,20 @@ public interface StreamableQueryEnvironment<T> extends QueryEnvironment<T> {
Callback<T> callback,
int depth);
+ /** Similar to {@link #getAllRdeps} but finds all rdeps without a depth bound. */
+ QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
+ QueryExpression expression, VariableContext<T> context, Callback<T> callback);
+
/**
- * Similar to {@link #getAllRdeps} but finds all rdeps without a depth bound, making use of the
- * provided {@code forkJoinPool}.
+ * Similar to {@link #getAllRdepsUnboundedParallel} but finds rdeps in a universe without a depth
+ * depth.
+ *
+ * @param expression a "rdeps" expression without depth, such as rdeps(u, x)
+ * @param args two-item list containing both universe 'u' and argument set 'x' in rdeps(u, x)
*/
- QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
+ QueryTaskFuture<Void> getRdepsUnboundedInUniverseParallel(
QueryExpression expression,
VariableContext<T> context,
+ List<Argument> args,
Callback<T> callback);
}