diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/engine')
5 files changed, 119 insertions, 81 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java index 5a3b91f575..6292b7f969 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java @@ -13,7 +13,6 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; -import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; @@ -58,11 +57,27 @@ public class AllRdepsFunction implements QueryFunction { QueryExpression expression, List<Argument> args, Callback<T> callback) { - return evalRdeps(env, context, args, callback, Optional.<Predicate<T>>absent()); + boolean isDepthUnbounded = args.size() == 1; + int depth = isDepthUnbounded ? Integer.MAX_VALUE : args.get(1).getInteger(); + QueryExpression argumentExpression = args.get(0).getExpression(); + if (env instanceof StreamableQueryEnvironment) { + StreamableQueryEnvironment<T> streamableEnv = (StreamableQueryEnvironment<T>) env; + return isDepthUnbounded + ? streamableEnv.getAllRdepsUnboundedParallel(argumentExpression, context, callback) + : streamableEnv.getAllRdepsBoundedParallel(argumentExpression, depth, context, callback); + } else { + return eval( + env, + argumentExpression, + Predicates.<T>alwaysTrue(), + context, + callback, + depth); + } } - /** Evaluates rdeps query. */ - public static <T> QueryTaskFuture<Void> eval( + /** Common non-parallel implementation of depth-bounded allrdeps/deps. */ + static <T> QueryTaskFuture<Void> eval( final QueryEnvironment<T> env, QueryExpression expression, final Predicate<T> universe, @@ -100,25 +115,4 @@ public class AllRdepsFunction implements QueryFunction { } }); } - - static <T> QueryTaskFuture<Void> evalRdeps( - final QueryEnvironment<T> env, - VariableContext<T> context, - final List<Argument> args, - final Callback<T> callback, - Optional<Predicate<T>> universeMaybe) { - final int depth = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE; - final Predicate<T> universe = universeMaybe.isPresent() - ? universeMaybe.get() - : Predicates.<T>alwaysTrue(); - if (env instanceof StreamableQueryEnvironment<?>) { - StreamableQueryEnvironment<T> streamableEnv = ((StreamableQueryEnvironment<T>) env); - return depth == Integer.MAX_VALUE && !universeMaybe.isPresent() - ? streamableEnv.getAllRdepsUnboundedParallel(args.get(0).getExpression(), context, callback) - : streamableEnv.getAllRdeps( - args.get(0).getExpression(), universe, context, callback, depth); - } else { - return eval(env, args.get(0).getExpression(), universe, context, callback, depth); - } - } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java b/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java index c62bea5754..92edef59b4 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/MinDepthUniquifier.java @@ -23,9 +23,25 @@ import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; */ @ThreadSafe public interface MinDepthUniquifier<T> { + + /** + * Returns whether {@code newElement} hasn't been seen before at depth less than or equal to + * {@code depth} by {@link #uniqueAtDepthLessThanOrEqualTo(T, int)} or + * {@link #uniqueAtDepthLessThanOrEqualTo(Iterable, int)}. + * + * <p>Please note the difference between this method and + * {@link #uniqueAtDepthLessThanOrEqualTo(T, int)}! + * + * <p>This method is inherently racy wrt {@link #uniqueAtDepthLessThanOrEqualTo(T, int)} and + * {@link #uniqueAtDepthLessThanOrEqualTo(Iterable, int)}. Only use it if you know what you are + * doing. + */ + boolean uniqueAtDepthLessThanOrEqualToPure(T newElement, int depth); + /** - * Returns the subset of {@code newElements} that haven't been seen before at depths less than or - * equal to {@code depth} + * Returns whether {@code newElement} hasn't been seen before at depth less than or equal to + * {@code depth} by {@link #uniqueAtDepthLessThanOrEqualTo(T, int)} or + * {@link #uniqueAtDepthLessThanOrEqualTo(Iterable, int)}. * * <p> There's a natural benign check-then-act race in all concurrent uses of this interface. * Imagine we have an element e, two depths d1 and d2 (with d2 < d1), and two threads T1 and T2. @@ -33,6 +49,13 @@ public interface MinDepthUniquifier<T> { * But before T1 finishes processing e, T2 may think _it's_ about to be first one to process an * element at a depth less than or equal to than d2. T1's work is probably wasted. */ + boolean uniqueAtDepthLessThanOrEqualTo(T newElement, int depth); + + /** + * Batch version of {@link #uniqueAtDepthLessThanOrEqualTo(Object, int)}. + * + * <p>The same benign check-then-act race applies here too. + */ ImmutableList<T> uniqueAtDepthLessThanOrEqualTo(Iterable<T> newElements, int depth); } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java index 3c6714b943..f2a9939ff2 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java @@ -267,27 +267,41 @@ public final class QueryUtil { @Override public final ImmutableList<T> uniqueAtDepthLessThanOrEqualTo( Iterable<T> newElements, int depth) { - ImmutableList.Builder<T> result = ImmutableList.builder(); - for (T element : newElements) { - AtomicInteger newDepth = new AtomicInteger(depth); - AtomicInteger previousDepth = - alreadySeenAtDepth.putIfAbsent(extractor.extractKey(element), newDepth); - if (previousDepth != null) { + ImmutableList.Builder<T> resultBuilder = ImmutableList.builder(); + for (T newElement : newElements) { + if (uniqueAtDepthLessThanOrEqualTo(newElement, depth)) { + resultBuilder.add(newElement); + } + } + return resultBuilder.build(); + } + + @Override + public boolean uniqueAtDepthLessThanOrEqualTo(T newElement, int depth) { + AtomicInteger newDepth = new AtomicInteger(depth); + AtomicInteger previousDepth = + alreadySeenAtDepth.putIfAbsent(extractor.extractKey(newElement), newDepth); + if (previousDepth == null) { + return true; + } + if (depth < previousDepth.get()) { + synchronized (previousDepth) { if (depth < previousDepth.get()) { - synchronized (previousDepth) { - if (depth < previousDepth.get()) { - // We've seen the element before, but never at a depth this shallow. - previousDepth.set(depth); - result.add(element); - } - } + // We've seen the element before, but never at a depth this shallow. + previousDepth.set(depth); + return true; } - } else { - // We've never seen the element before. - result.add(element); } } - return result.build(); + return false; + } + + @Override + public boolean uniqueAtDepthLessThanOrEqualToPure(T newElement, int depth) { + AtomicInteger previousDepth = alreadySeenAtDepth.get(extractor.extractKey(newElement)); + return previousDepth != null + ? depth < previousDepth.get() + : true; } } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java index 3971474edf..e40960a843 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java @@ -14,7 +14,6 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.base.Function; -import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; @@ -53,35 +52,48 @@ public final class RdepsFunction extends AllRdepsFunction { @Override public <T> QueryTaskFuture<Void> eval( - final QueryEnvironment<T> env, - final VariableContext<T> context, - final QueryExpression expression, - final List<Argument> args, - final Callback<T> callback) { + QueryEnvironment<T> env, + VariableContext<T> context, + QueryExpression expression, + List<Argument> args, + Callback<T> callback) { boolean isDepthUnbounded = args.size() == 2; - return (isDepthUnbounded && env instanceof StreamableQueryEnvironment) - ? ((StreamableQueryEnvironment<T>) env) - .getRdepsUnboundedInUniverseParallel(expression, context, args, callback) - : evalWithBoundedDepth(env, context, expression, args, callback); + int depth = isDepthUnbounded ? Integer.MAX_VALUE : args.get(2).getInteger(); + QueryExpression universeExpression = args.get(0).getExpression(); + QueryExpression argumentExpression = args.get(1).getExpression(); + if (env instanceof StreamableQueryEnvironment) { + StreamableQueryEnvironment<T> streamableEnv = (StreamableQueryEnvironment<T>) env; + return isDepthUnbounded + ? streamableEnv.getRdepsUnboundedParallel( + argumentExpression, universeExpression, context, callback) + : streamableEnv.getRdepsBoundedParallel( + argumentExpression, depth, universeExpression, context, callback); + } else { + return evalWithBoundedDepth( + env, expression, context, argumentExpression, depth, universeExpression, callback); + } } /** * Compute the transitive closure of the universe, then breadth-first search from the argument * towards the universe while staying within the transitive closure. */ - public static <T> QueryTaskFuture<Void> evalWithBoundedDepth( + private static <T> QueryTaskFuture<Void> evalWithBoundedDepth( QueryEnvironment<T> env, + QueryExpression rdepsFunctionExpressionForErrorMessages, VariableContext<T> context, - QueryExpression expression, - final List<Argument> args, + QueryExpression argumentExpression, + int depth, + QueryExpression universeExpression, Callback<T> callback) { QueryTaskFuture<ThreadSafeMutableSet<T>> universeValueFuture = - QueryUtil.evalAll(env, context, args.get(0).getExpression()); + QueryUtil.evalAll(env, context, universeExpression); Function<ThreadSafeMutableSet<T>, QueryTaskFuture<Void>> evalInUniverseAsyncFunction = universeValue -> { Predicate<T> universe; try { - env.buildTransitiveClosure(expression, universeValue, Integer.MAX_VALUE); + env.buildTransitiveClosure( + rdepsFunctionExpressionForErrorMessages, universeValue, Integer.MAX_VALUE); universe = Predicates.in(env.getTransitiveClosure(universeValue)); } catch (InterruptedException e) { return env.immediateCancelledFuture(); @@ -89,8 +101,8 @@ public final class RdepsFunction extends AllRdepsFunction { return env.immediateFailedFuture(e); } - return AllRdepsFunction.evalRdeps( - env, context, args.subList(1, args.size()), callback, Optional.of(universe)); + return AllRdepsFunction.eval( + env, argumentExpression, universe, context, callback, depth); }; return env.transformAsync(universeValueFuture, evalInUniverseAsyncFunction); diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java index b055d6ad0a..ef34742e58 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java @@ -13,38 +13,33 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; -import com.google.common.base.Predicate; -import java.util.List; - /** * The environment of a Blaze query which supports predefined streaming operations. * * @param <T> the node type of the dependency graph */ public interface StreamableQueryEnvironment<T> extends QueryEnvironment<T> { - - /** Retrieves and processes all reverse dependencies of given expression in a streaming manner. */ - QueryTaskFuture<Void> getAllRdeps( + QueryTaskFuture<Void> getAllRdepsBoundedParallel( QueryExpression expression, - Predicate<T> universe, + int depth, VariableContext<T> context, - Callback<T> callback, - int depth); + Callback<T> callback); - /** Similar to {@link #getAllRdeps} but finds all rdeps without a depth bound. */ QueryTaskFuture<Void> getAllRdepsUnboundedParallel( - QueryExpression expression, VariableContext<T> context, Callback<T> callback); + QueryExpression expression, + VariableContext<T> context, + Callback<T> callback); + + QueryTaskFuture<Void> getRdepsBoundedParallel( + QueryExpression expression, + int depth, + QueryExpression universe, + VariableContext<T> context, + Callback<T> callback); - /** - * Similar to {@link #getAllRdepsUnboundedParallel} but finds rdeps in a universe without a depth - * depth. - * - * @param expression a "rdeps" expression without depth, such as rdeps(u, x) - * @param args two-item list containing both universe 'u' and argument set 'x' in rdeps(u, x) - */ - QueryTaskFuture<Void> getRdepsUnboundedInUniverseParallel( + QueryTaskFuture<Void> getRdepsUnboundedParallel( QueryExpression expression, + QueryExpression universe, VariableContext<T> context, - List<Argument> args, Callback<T> callback); } |