diff options
Diffstat (limited to 'src')
7 files changed, 70 insertions, 31 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java index 87f8e7de51..ae499580d3 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java @@ -94,7 +94,7 @@ class ParallelSkyQueryUtils { } /** A helper class that computes 'rbuildfiles(<blah>)' via BFS. */ - private static class RBuildFilesVisitor extends ParallelVisitor<SkyKey> { + private static class RBuildFilesVisitor extends ParallelVisitor<SkyKey, Target> { private final SkyQueryEnvironment env; private final MultisetSemaphore<PackageIdentifier> packageSemaphore; @@ -130,9 +130,9 @@ class ParallelSkyQueryUtils { } @Override - protected void processResultantTargets( + protected void processPartialResults( Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) - throws QueryException, InterruptedException { + throws QueryException, InterruptedException { Set<PackageIdentifier> pkgIdsNeededForResult = Streams.stream(keysToUseForResult) .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER) @@ -164,7 +164,8 @@ class ParallelSkyQueryUtils { * even with 10M edges, the memory overhead is around 160M, and the memory can be reclaimed by * regular GC. */ - private static class AllRdepsUnboundedVisitor extends ParallelVisitor<Pair<SkyKey, SkyKey>> { + private static class AllRdepsUnboundedVisitor + extends ParallelVisitor<Pair<SkyKey, SkyKey>, Target> { private final SkyQueryEnvironment env; private final MultisetSemaphore<PackageIdentifier> packageSemaphore; @@ -201,7 +202,7 @@ class ParallelSkyQueryUtils { } @Override - public ParallelVisitor<Pair<SkyKey, SkyKey>> create() { + public ParallelVisitor<Pair<SkyKey, SkyKey>, Target> create() { return new AllRdepsUnboundedVisitor(env, uniquifier, callback, packageSemaphore); } } @@ -270,9 +271,9 @@ class ParallelSkyQueryUtils { } @Override - protected void processResultantTargets( + protected void processPartialResults( Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) - throws QueryException, InterruptedException { + throws QueryException, InterruptedException { Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap = env.makePackageKeyToTargetKeyMap(keysToUseForResult); Set<PackageIdentifier> pkgIdsNeededForResult = diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java index 0bb9f1b8ab..e81aadd162 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java @@ -40,11 +40,14 @@ import java.util.concurrent.TimeUnit; * * <p>The visitor uses an AbstractQueueVisitor backed by a ThreadPoolExecutor with a thread pool NOT * part of the global query evaluation pool to avoid starvation. + * + * @param <T> the type of objects to visit + * @param <V> the type of visitation results to process */ @ThreadSafe -public abstract class ParallelVisitor<T> { +public abstract class ParallelVisitor<T, V> { private final Uniquifier<T> uniquifier; - private final Callback<Target> callback; + private final Callback<V> callback; private final int visitBatchSize; private final VisitingTaskExecutor executor; @@ -80,7 +83,7 @@ public abstract class ParallelVisitor<T> { * * <p>TODO(shazh): Revisit the choice of task target based on real-prod performance. */ - private static final long MIN_PENDING_TASKS = 3 * SkyQueryEnvironment.DEFAULT_THREAD_COUNT; + private static final long MIN_PENDING_TASKS = 3L * SkyQueryEnvironment.DEFAULT_THREAD_COUNT; /** * Fail fast on RuntimeExceptions, including {@code RuntimeInterruptedException} and {@code @@ -114,8 +117,7 @@ public abstract class ParallelVisitor<T> { /*workQueue=*/ new BlockingStack<Runnable>(), new ThreadFactoryBuilder().setNameFormat("parallel-visitor %d").build()); - protected ParallelVisitor( - Uniquifier<T> uniquifier, Callback<Target> callback, int visitBatchSize) { + protected ParallelVisitor(Uniquifier<T> uniquifier, Callback<V> callback, int visitBatchSize) { this.uniquifier = uniquifier; this.callback = callback; this.visitBatchSize = visitBatchSize; @@ -125,7 +127,7 @@ public abstract class ParallelVisitor<T> { /** Factory for {@link ParallelVisitor} instances. */ public interface Factory { - ParallelVisitor<?> create(); + ParallelVisitor<?, ?> create(); } /** @@ -154,11 +156,11 @@ public abstract class ParallelVisitor<T> { } /** - * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s in - * the full visitation to the given {@link Callback}. + * Forwards the given {@code keysToUseForResult}'s contribution to the set of results in the full + * visitation to the given {@link Callback}. */ - protected abstract void processResultantTargets( - Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) + protected abstract void processPartialResults( + Iterable<SkyKey> keysToUseForResult, Callback<V> callback) throws QueryException, InterruptedException; /** Gets the {@link Visit} representing the local visitation of the given {@code values}. */ @@ -227,7 +229,7 @@ public abstract class ParallelVisitor<T> { @Override protected void process() throws QueryException, InterruptedException { - processResultantTargets(keysToUseForResult, callback); + processPartialResults(keysToUseForResult, callback); } } @@ -312,7 +314,7 @@ public abstract class ParallelVisitor<T> { @Override public void process(Iterable<Target> partialResult) throws QueryException, InterruptedException { - ParallelVisitor<?> visitor = visitorFactory.create(); + ParallelVisitor<?, ?> visitor = visitorFactory.create(); // TODO(nharmata): It's not ideal to have an operation like this in #process that blocks on // another, potentially expensive computation. Refactor to something like "processAsync". visitor.visitAndWaitForCompletion( diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index 0fb0283594..3bd547ceb9 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -1247,6 +1247,15 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> this, expression, context, callback, packageSemaphore); } + @Override + public QueryTaskFuture<Void> getRdepsUnboundedInUniverseParallel( + QueryExpression expression, + VariableContext<Target> context, + List<Argument> args, + Callback<Target> callback) { + return RdepsFunction.evalWithBoundedDepth(this, context, expression, args, callback); + } + @ThreadSafe @Override public QueryTaskFuture<Void> getAllRdeps( diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java index becfe2453c..5a3b91f575 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java @@ -58,7 +58,7 @@ public class AllRdepsFunction implements QueryFunction { QueryExpression expression, List<Argument> args, Callback<T> callback) { - return eval(env, context, args, callback, Optional.<Predicate<T>>absent()); + return evalRdeps(env, context, args, callback, Optional.<Predicate<T>>absent()); } /** Evaluates rdeps query. */ @@ -101,7 +101,7 @@ public class AllRdepsFunction implements QueryFunction { }); } - protected <T> QueryTaskFuture<Void> eval( + static <T> QueryTaskFuture<Void> evalRdeps( final QueryEnvironment<T> env, VariableContext<T> context, final List<Argument> args, diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java index 73dd930ac3..7f57f796bf 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java @@ -105,7 +105,10 @@ public final class QueryUtil { return new OrderedAggregateAllOutputFormatterCallbackImpl<>(env); } - /** Returns a fresh {@link AggregateAllCallback} instance. */ + /** + * Returns a fresh {@link AggregateAllCallback} instance that aggregates all of the values into an + * {@link ThreadSafeMutableSet}. + */ public static <T> AggregateAllCallback<T, ThreadSafeMutableSet<T>> newAggregateAllCallback( QueryEnvironment<T> env) { return new AggregateAllOutputFormatterCallbackImpl<>(env); diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java index ebd4d7d106..3971474edf 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java @@ -51,10 +51,6 @@ public final class RdepsFunction extends AllRdepsFunction { .add(ArgumentType.EXPRESSION).addAll(super.getArgumentTypes()).build(); } - /** - * Compute the transitive closure of the universe, then breadth-first search from the argument - * towards the universe while staying within the transitive closure. - */ @Override public <T> QueryTaskFuture<Void> eval( final QueryEnvironment<T> env, @@ -62,6 +58,23 @@ public final class RdepsFunction extends AllRdepsFunction { final QueryExpression expression, final List<Argument> args, final Callback<T> callback) { + boolean isDepthUnbounded = args.size() == 2; + return (isDepthUnbounded && env instanceof StreamableQueryEnvironment) + ? ((StreamableQueryEnvironment<T>) env) + .getRdepsUnboundedInUniverseParallel(expression, context, args, callback) + : evalWithBoundedDepth(env, context, expression, args, callback); + } + + /** + * Compute the transitive closure of the universe, then breadth-first search from the argument + * towards the universe while staying within the transitive closure. + */ + public static <T> QueryTaskFuture<Void> evalWithBoundedDepth( + QueryEnvironment<T> env, + VariableContext<T> context, + QueryExpression expression, + final List<Argument> args, + Callback<T> callback) { QueryTaskFuture<ThreadSafeMutableSet<T>> universeValueFuture = QueryUtil.evalAll(env, context, args.get(0).getExpression()); Function<ThreadSafeMutableSet<T>, QueryTaskFuture<Void>> evalInUniverseAsyncFunction = @@ -75,9 +88,11 @@ public final class RdepsFunction extends AllRdepsFunction { } catch (QueryException e) { return env.immediateFailedFuture(e); } - return RdepsFunction.this.eval( + + return AllRdepsFunction.evalRdeps( env, context, args.subList(1, args.size()), callback, Optional.of(universe)); }; + return env.transformAsync(universeValueFuture, evalInUniverseAsyncFunction); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java index bb67e93ad5..b055d6ad0a 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java @@ -14,6 +14,7 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.base.Predicate; +import java.util.List; /** * The environment of a Blaze query which supports predefined streaming operations. @@ -22,7 +23,7 @@ import com.google.common.base.Predicate; */ public interface StreamableQueryEnvironment<T> extends QueryEnvironment<T> { - /** Retrieve and process all reverse dependencies of given expression in a streaming manner. */ + /** Retrieves and processes all reverse dependencies of given expression in a streaming manner. */ QueryTaskFuture<Void> getAllRdeps( QueryExpression expression, Predicate<T> universe, @@ -30,12 +31,20 @@ public interface StreamableQueryEnvironment<T> extends QueryEnvironment<T> { Callback<T> callback, int depth); + /** Similar to {@link #getAllRdeps} but finds all rdeps without a depth bound. */ + QueryTaskFuture<Void> getAllRdepsUnboundedParallel( + QueryExpression expression, VariableContext<T> context, Callback<T> callback); + /** - * Similar to {@link #getAllRdeps} but finds all rdeps without a depth bound, making use of the - * provided {@code forkJoinPool}. + * Similar to {@link #getAllRdepsUnboundedParallel} but finds rdeps in a universe without a depth + * depth. + * + * @param expression a "rdeps" expression without depth, such as rdeps(u, x) + * @param args two-item list containing both universe 'u' and argument set 'x' in rdeps(u, x) */ - QueryTaskFuture<Void> getAllRdepsUnboundedParallel( + QueryTaskFuture<Void> getRdepsUnboundedInUniverseParallel( QueryExpression expression, VariableContext<T> context, + List<Argument> args, Callback<T> callback); } |