diff options
author | Nathan Harmata <nharmata@google.com> | 2016-12-02 17:58:33 +0000 |
---|---|---|
committer | Kristina Chodorow <kchodorow@google.com> | 2016-12-02 19:09:07 +0000 |
commit | e0a330577d9fe98169645cb68d9fc22cc787eeb6 (patch) | |
tree | 9b1d3dc955baa99133511b9134715e9c73649a85 /src/main/java/com/google/devtools/build | |
parent | a110ac400190c90a45856f15482c8d0952c542f5 (diff) |
For all function expressions of the form f(..., e1, ..., e2, ..., eK, ...), ensure that all of e1, e2, ..., and eK are elligble for parallel evaluation. This is _not_ the same as providing a parallel implementation of f, which we can do separately in followup CLs.
--
PiperOrigin-RevId: 140861694
MOS_MIGRATED_REVID=140861694
Diffstat (limited to 'src/main/java/com/google/devtools/build')
10 files changed, 373 insertions, 118 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java index 696eee2230..ae4640c57c 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java @@ -263,7 +263,7 @@ public abstract class AbstractBlazeQueryEnvironment<T> // Will skip the target and keep going if -k is specified. reportBuildFileError(caller, e.getMessage()); } - AggregateAllCallback<T> aggregatingCallback = QueryUtil.newAggregateAllCallback(); + AggregateAllCallback<T> aggregatingCallback = QueryUtil.newThreadSafeAggregateAllCallback(); getTargetsMatchingPattern(caller, pattern, aggregatingCallback); return aggregatingCallback.getResult(); } diff --git a/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java index 0a74cc0074..987d0d38b3 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java @@ -287,7 +287,7 @@ public class BlazeQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> @Override public void eval(QueryExpression expr, VariableContext<Target> context, Callback<Target> callback) throws QueryException, InterruptedException { - AggregateAllCallback<Target> aggregator = QueryUtil.newAggregateAllCallback(); + AggregateAllCallback<Target> aggregator = QueryUtil.newOrderedAggregateAllCallback(); expr.eval(this, context, aggregator); callback.process(aggregator.getResult()); } diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index 87a57af2c4..78a850b8e7 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -756,6 +756,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> return pkgResults.build(); } + @ThreadSafe @Override public void buildTransitiveClosure(QueryExpression caller, Set<Target> targets, int maxDepth) throws QueryException, InterruptedException { @@ -1235,7 +1236,26 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> eval( expression, context, - new BatchAllRdepsCallback(uniquifier, universe, callback, depth, batchSize)); + (callback instanceof ThreadSafeCallback) && (uniquifier instanceof ThreadSafeUniquifier) + ? new ThreadSafeBatchAllRdepsCallback( + (ThreadSafeUniquifier<Target>) uniquifier, + universe, + (ThreadSafeCallback<Target>) callback, + depth, + batchSize) + : new BatchAllRdepsCallback(uniquifier, universe, callback, depth, batchSize)); + } + + private class ThreadSafeBatchAllRdepsCallback + extends BatchAllRdepsCallback implements ThreadSafeCallback<Target> { + protected ThreadSafeBatchAllRdepsCallback( + ThreadSafeUniquifier<Target> uniquifier, + Predicate<Target> universe, + ThreadSafeCallback<Target> callback, + int depth, + int batchSize) { + super(uniquifier, universe, callback, depth, batchSize); + } } private class BatchAllRdepsCallback implements Callback<Target> { @@ -1245,7 +1265,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> private final int depth; private final int batchSize; - private BatchAllRdepsCallback( + protected BatchAllRdepsCallback( Uniquifier<Target> uniquifier, Predicate<Target> universe, Callback<Target> callback, diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java index 482b4cd89b..6c37e41ff7 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java @@ -18,7 +18,7 @@ import com.google.common.collect.Sets; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; - +import com.google.devtools.build.lib.query2.engine.QueryUtil.ProcessorWithUniquifier; import java.util.Collection; import java.util.List; import java.util.Set; @@ -50,39 +50,66 @@ final class DepsFunction implements QueryFunction { return ImmutableList.of(ArgumentType.EXPRESSION, ArgumentType.INTEGER); } + private static class ProcessorImpl<T> implements ProcessorWithUniquifier<T> { + private final QueryExpression expression; + private final int depthBound; + private final QueryEnvironment<T> env; + + private ProcessorImpl(QueryExpression expression, int depthBound, QueryEnvironment<T> env) { + this.expression = expression; + this.depthBound = depthBound; + this.env = env; + } + + @Override + public void process(Iterable<T> partialResult, Uniquifier<T> uniquifier, Callback<T> callback) + throws QueryException, InterruptedException { + Collection<T> current = Sets.newHashSet(partialResult); + env.buildTransitiveClosure(expression, (Set<T>) current, depthBound); + + // We need to iterate depthBound + 1 times. + for (int i = 0; i <= depthBound; i++) { + // Filter already visited nodes: if we see a node in a later round, then we don't need to + // visit it again, because the depth at which we see it at must be greater than or equal + // to the last visit. + ImmutableList<T> toProcess = uniquifier.unique(current); + callback.process(toProcess); + current = ImmutableList.copyOf(env.getFwdDeps(toProcess)); + if (current.isEmpty()) { + // Exit when there are no more nodes to visit. + break; + } + } + } + } + + private static <T> void doEval( + QueryEnvironment<T> env, + VariableContext<T> context, + QueryExpression expression, + List<Argument> args, + Callback<T> callback) throws QueryException, InterruptedException { + int depthBound = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE; + env.eval( + args.get(0).getExpression(), + context, + QueryUtil.compose( + new ProcessorImpl<T>(expression, depthBound, env), + env.createUniquifier(), + callback)); + } + /** * Breadth-first search from the arguments. */ @Override public <T> void eval( - final QueryEnvironment<T> env, + QueryEnvironment<T> env, VariableContext<T> context, - final QueryExpression expression, + QueryExpression expression, List<Argument> args, - final Callback<T> callback) throws QueryException, InterruptedException { - final int depthBound = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE; - final Uniquifier<T> uniquifier = env.createUniquifier(); - env.eval(args.get(0).getExpression(), context, new Callback<T>() { - @Override - public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { - Collection<T> current = Sets.newHashSet(partialResult); - env.buildTransitiveClosure(expression, (Set<T>) current, depthBound); - - // We need to iterate depthBound + 1 times. - for (int i = 0; i <= depthBound; i++) { - // Filter already visited nodes: if we see a node in a later round, then we don't need to - // visit it again, because the depth at which we see it at must be greater than or equal - // to the last visit. - ImmutableList<T> toProcess = uniquifier.unique(current); - callback.process(toProcess); - current = ImmutableList.copyOf(env.getFwdDeps(toProcess)); - if (current.isEmpty()) { - // Exit when there are no more nodes to visit. - break; - } - } - } - }); + Callback<T> callback) throws QueryException, InterruptedException { + doEval(env, context, expression, args, callback); } @Override @@ -93,6 +120,6 @@ final class DepsFunction implements QueryFunction { List<Argument> args, ThreadSafeCallback<T> callback, ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - eval(env, context, expression, args, callback); + doEval(env, context, expression, args, callback); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java index 6c9ae89d68..c31eb7c926 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java @@ -17,7 +17,7 @@ import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; - +import com.google.devtools.build.lib.query2.engine.QueryUtil.ProcessorWithUniquifier; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ForkJoinPool; @@ -52,34 +52,61 @@ class LabelsFunction implements QueryFunction { return ImmutableList.of(ArgumentType.WORD, ArgumentType.EXPRESSION); } - @Override - public <T> void eval( - final QueryEnvironment<T> env, - VariableContext<T> context, - final QueryExpression expression, - final List<Argument> args, - final Callback<T> callback) - throws QueryException, InterruptedException { - final String attrName = args.get(0).getWord(); - final Uniquifier<T> uniquifier = env.createUniquifier(); - env.eval(args.get(1).getExpression(), context, new Callback<T>() { - @Override - public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { - for (T input : partialResult) { - if (env.getAccessor().isRule(input)) { - List<T> targets = uniquifier.unique( - env.getAccessor().getLabelListAttr(expression, input, attrName, - "in '" + attrName + "' of rule " + env.getAccessor().getLabel(input) + ": ")); - List<T> result = new ArrayList<>(targets.size()); - for (T target : targets) { - result.add(env.getOrCreate(target)); - } - callback.process(result); + private static class ProcessorImpl<T> implements ProcessorWithUniquifier<T> { + private final QueryExpression expression; + private final String attrName; + private final QueryEnvironment<T> env; + + private ProcessorImpl(QueryExpression expression, String attrName, QueryEnvironment<T> env) { + this.expression = expression; + this.attrName = attrName; + this.env = env; + } + + @Override + public void process( + Iterable<T> partialResult, + Uniquifier<T> uniquifier, + Callback<T> callback) throws QueryException, InterruptedException { + for (T input : partialResult) { + if (env.getAccessor().isRule(input)) { + List<T> targets = uniquifier.unique( + env.getAccessor().getLabelListAttr(expression, input, attrName, + "in '" + attrName + "' of rule " + env.getAccessor().getLabel(input) + ": ")); + List<T> result = new ArrayList<>(targets.size()); + for (T target : targets) { + result.add(env.getOrCreate(target)); } + callback.process(result); } - } - }); + } + } + + private static <T> void doEval( + QueryEnvironment<T> env, + VariableContext<T> context, + QueryExpression expression, + List<Argument> args, + Callback<T> callback) throws QueryException, InterruptedException { + String attrName = args.get(0).getWord(); + env.eval( + args.get(1).getExpression(), + context, + QueryUtil.compose( + new ProcessorImpl<T>(expression, attrName, env), + env.createUniquifier(), + callback)); + } + + @Override + public <T> void eval( + QueryEnvironment<T> env, + VariableContext<T> context, + QueryExpression expression, + List<Argument> args, + Callback<T> callback) throws QueryException, InterruptedException { + doEval(env, context, expression, args, callback); } @Override @@ -90,6 +117,6 @@ class LabelsFunction implements QueryFunction { List<Argument> args, ThreadSafeCallback<T> callback, ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - eval(env, context, expression, args, callback); + doEval(env, context, expression, args, callback); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java index 509f2ffdfa..73c6aba400 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java @@ -48,7 +48,7 @@ class LoadFilesFunction implements QueryEnvironment.QueryFunction { env.eval( args.get(0).getExpression(), context, - new Callback<T>() { + new ThreadSafeCallback<T>() { @Override public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java index 2d7be2ed74..b8087c51c5 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java @@ -17,9 +17,11 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.MapMaker; +import com.google.common.collect.Sets; import com.google.devtools.build.lib.collect.CompactHashSet; import java.util.Collections; import java.util.Set; +import javax.annotation.concurrent.ThreadSafe; /** Several query utilities to make easier to work with query callbacks and uniquifiers. */ public final class QueryUtil { @@ -31,6 +33,11 @@ public final class QueryUtil { Set<T> getResult(); } + /** A {@link AggregateAllCallback} that is a {@link ThreadSafeCallback}. */ + public interface ThreadSafeAggregateAllCallback<T> + extends AggregateAllCallback<T>, ThreadSafeCallback<T> { + } + /** A {@link OutputFormatterCallback} that can aggregate all the partial results into one set. */ public abstract static class AggregateAllOutputFormatterCallback<T> extends OutputFormatterCallback<T> implements AggregateAllCallback<T> { @@ -38,6 +45,7 @@ public final class QueryUtil { private static class AggregateAllOutputFormatterCallbackImpl<T> extends AggregateAllOutputFormatterCallback<T> { + // We only add to the CompactHashSet, so iteration order is the same as insertion order. private final Set<T> result = CompactHashSet.create(); @Override @@ -51,12 +59,27 @@ public final class QueryUtil { } } + private static class ThreadSafeAggregateAllOutputFormatterCallbackImpl<T> + implements ThreadSafeAggregateAllCallback<T> { + private final Set<T> result = Sets.newConcurrentHashSet(); + + @Override + public final void process(Iterable<T> partialResult) { + Iterables.addAll(result, partialResult); + } + + @Override + public Set<T> getResult() { + return result; + } + } + /** * Returns a fresh {@link AggregateAllOutputFormatterCallback} that can aggregate all the partial * results into one set. * * <p>Intended to be used by top-level evaluation of {@link QueryExpression}s; contrast with - * {@link #newAggregateAllCallback}. + * {@link #newThreadSafeAggregateAllCallback}. */ public static <T> AggregateAllOutputFormatterCallback<T> newAggregateAllOutputFormatterCallback() { @@ -66,11 +89,21 @@ public final class QueryUtil { /** * Returns a fresh {@link AggregateAllCallback}. * + * <p>Intended to be used by {@link QueryEnvironment#eval} implementations that make use of + * neither of streaming evaluation nor parallel evaluation. + */ + public static <T> AggregateAllCallback<T> newOrderedAggregateAllCallback() { + return new AggregateAllOutputFormatterCallbackImpl<>(); + } + + /** + * Returns a fresh {@link ThreadSafeAggregateAllCallback}. + * * <p>Intended to be used by {@link QueryExpression} implementations; contrast with * {@link #newAggregateAllOutputFormatterCallback}. */ - public static <T> AggregateAllCallback<T> newAggregateAllCallback() { - return new AggregateAllOutputFormatterCallbackImpl<>(); + public static <T> AggregateAllCallback<T> newThreadSafeAggregateAllCallback() { + return new ThreadSafeAggregateAllOutputFormatterCallbackImpl<>(); } /** @@ -81,7 +114,7 @@ public final class QueryUtil { public static <T> Set<T> evalAll( QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expr) throws QueryException, InterruptedException { - AggregateAllCallback<T> callback = newAggregateAllCallback(); + AggregateAllCallback<T> callback = newThreadSafeAggregateAllCallback(); env.eval(expr, context, callback); return callback.getResult(); } @@ -139,4 +172,91 @@ public final class QueryUtil { return result.build(); } } + + /** + * An abstraction for sending results to a {@link Callback} based on partial query results. + * + * <p>Intended as a convenience for {@link QueryFunction#eval}/{@link QueryFunction#parEval} + * implementations. + */ + @ThreadSafe + interface Processor<T> { + /** + * Processes the given {@code partialResult} and then invokes the given {@code callback} with + * a further result. + */ + void process(Iterable<T> partialResult, Callback<T> callback) + throws QueryException, InterruptedException; + } + + /** + * Similar to {@link Processor}, except the {@link #process} function also gets access to a + * {@link Uniquifier}. + */ + @ThreadSafe + interface ProcessorWithUniquifier<T> { + /** + * Processes the given {@code partialResult} using the given {@code uniquifier} and then invokes + * the given {@code callback} with a further result. + */ + void process(Iterable<T> partialResult, Uniquifier<T> uniquifier, Callback<T> callback) + throws QueryException, InterruptedException; + } + + /** + * Returns a new {@link Callback} that composes {@code callback} with {@code processor}. That is, + * this method returns a {@code c} such that {@code c.process(r)} calls + * {@code callback.process(r, callback)}. + * + * <p>The returned {@link Callback} will be a {@link ThreadSafeCallback} if {@code callback} + * is as well. + */ + public static <T> Callback<T> compose( + final Processor<T> processor, + final Callback<T> callback) { + return callback instanceof ThreadSafeCallback + ? new ThreadSafeCallback<T>() { + @Override + public void process(Iterable<T> partialResult) + throws QueryException, InterruptedException { + processor.process(partialResult, callback); + } + } + : new Callback<T>() { + @Override + public void process(Iterable<T> partialResult) + throws QueryException, InterruptedException { + processor.process(partialResult, callback); + } + }; + } + + /** + * Returns a new {@link Callback} that composes {@code callback} with {@code processor} (applied + * to {@code uniquifier}). That is, this method returns a {@code c} such that {@code c.process(r)} + * calls {@code callback.process(r, uniquifier, c)}. + * + * <p>The returned {@link Callback} will be a {@link ThreadSafeCallback} if {@code callback} is a + * {@link ThreadSafeCallback} and {@code uniquifier} is a {@link ThreadSafeUniquifier}. + */ + public static <T> Callback<T> compose( + final ProcessorWithUniquifier<T> processor, + final Uniquifier<T> uniquifier, + final Callback<T> callback) { + return (callback instanceof ThreadSafeCallback) && (uniquifier instanceof ThreadSafeUniquifier) + ? new ThreadSafeCallback<T>() { + @Override + public void process(Iterable<T> partialResult) + throws QueryException, InterruptedException { + processor.process(partialResult, uniquifier, callback); + } + } + : new Callback<T>() { + @Override + public void process(Iterable<T> partialResult) + throws QueryException, InterruptedException { + processor.process(partialResult, uniquifier, callback); + } + }; + } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java index 7913e2dbfa..8822362749 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java @@ -19,7 +19,7 @@ import com.google.common.collect.Iterables; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; - +import com.google.devtools.build.lib.query2.engine.QueryUtil.Processor; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicBoolean; @@ -49,30 +49,51 @@ class SomeFunction implements QueryFunction { return ImmutableList.of(ArgumentType.EXPRESSION); } - @Override - public <T> void eval( + private static class ProcessorImpl<T> implements Processor<T> { + private final AtomicBoolean someFound; + + private ProcessorImpl(AtomicBoolean someFound) { + this.someFound = someFound; + } + + @Override + public void process(Iterable<T> partialResult, Callback<T> callback) + throws QueryException, InterruptedException { + if (someFound.get() || Iterables.isEmpty(partialResult)) { + return; + } + callback.process(ImmutableSet.of(partialResult.iterator().next())); + someFound.set(true); + } + } + + private static <T> void doEval( QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expression, List<Argument> args, - final Callback<T> callback) throws QueryException, InterruptedException { - final AtomicBoolean someFound = new AtomicBoolean(false); - env.eval(args.get(0).getExpression(), context, new Callback<T>() { - @Override - public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { - if (someFound.get() || Iterables.isEmpty(partialResult)) { - return; - } - callback.process(ImmutableSet.of(partialResult.iterator().next())); - someFound.set(true); - } - }); + Callback<T> callback) throws QueryException, InterruptedException { + AtomicBoolean someFound = new AtomicBoolean(false); + env.eval( + args.get(0).getExpression(), + context, + QueryUtil.compose(new ProcessorImpl<T>(someFound), callback)); if (!someFound.get()) { throw new QueryException(expression, "argument set is empty"); } } @Override + public <T> void eval( + QueryEnvironment<T> env, + VariableContext<T> context, + QueryExpression expression, + List<Argument> args, + final Callback<T> callback) throws QueryException, InterruptedException { + doEval(env, context, expression, args, callback); + } + + @Override public <T> void parEval( QueryEnvironment<T> env, VariableContext<T> context, @@ -80,6 +101,6 @@ class SomeFunction implements QueryFunction { List<Argument> args, ThreadSafeCallback<T> callback, ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - eval(env, context, expression, args, callback); + doEval(env, context, expression, args, callback); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java index d802edd0c2..40dcfbc8b8 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java @@ -15,10 +15,12 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting; +import com.google.devtools.build.lib.query2.engine.QueryUtil.Processor; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -61,29 +63,42 @@ class TestsFunction implements QueryFunction { return ImmutableList.of(ArgumentType.EXPRESSION); } + private static class ProcessorImpl<T> implements Processor<T> { + private final Closure<T> closure; + private final QueryEnvironment<T> env; + + private ProcessorImpl(Closure<T> closure, QueryEnvironment<T> env) { + this.closure = closure; + this.env = env; + } + + @Override + public void process( + Iterable<T> partialResult, + Callback<T> callback) throws QueryException, InterruptedException { + for (T target : partialResult) { + if (env.getAccessor().isTestRule(target)) { + callback.process(ImmutableList.of(target)); + } else if (env.getAccessor().isTestSuite(target)) { + for (T test : closure.getTestsInSuite(target)) { + callback.process(ImmutableList.of(env.getOrCreate(test))); + } + } + } + } + } + @Override public <T> void eval( - final QueryEnvironment<T> env, + QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expression, List<Argument> args, - final Callback<T> callback) throws QueryException, InterruptedException { - final Closure<T> closure = new Closure<>(expression, env); - - env.eval(args.get(0).getExpression(), context, new Callback<T>() { - @Override - public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { - for (T target : partialResult) { - if (env.getAccessor().isTestRule(target)) { - callback.process(ImmutableList.of(target)); - } else if (env.getAccessor().isTestSuite(target)) { - for (T test : closure.getTestsInSuite(target)) { - callback.process(ImmutableList.of(env.getOrCreate(test))); - } - } - } - } - }); + Callback<T> callback) throws QueryException, InterruptedException { + env.eval( + args.get(0).getExpression(), + context, + QueryUtil.compose(new ProcessorImpl<T>(new Closure<>(expression, env), env), callback)); } @Override @@ -94,7 +109,10 @@ class TestsFunction implements QueryFunction { List<Argument> args, ThreadSafeCallback<T> callback, ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - eval(env, context, expression, args, callback); + env.eval( + args.get(0).getExpression(), + context, + QueryUtil.compose(new ProcessorImpl<T>(new Closure<>(expression, env), env), callback)); } /** @@ -151,10 +169,8 @@ class TestsFunction implements QueryFunction { } } - /** - * A closure over the temporary state needed to compute the expression. This makes the evaluation - * thread-safe, as long as instances of this class are used only within a single thread. - */ + /** A closure over the temporary state needed to compute the expression. */ + @ThreadSafe private static final class Closure<T> { private final QueryExpression expression; /** A dynamically-populated mapping from test_suite rules to their tests. */ @@ -177,7 +193,8 @@ class TestsFunction implements QueryFunction { * * @precondition env.getAccessor().isTestSuite(testSuite) */ - private Set<T> getTestsInSuite(T testSuite) throws QueryException, InterruptedException { + private synchronized Set<T> getTestsInSuite(T testSuite) + throws QueryException, InterruptedException { Set<T> tests = testsInSuite.get(testSuite); if (tests == null) { tests = Sets.newHashSet(); diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java index 532f331378..2504b532b1 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java @@ -18,6 +18,7 @@ import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; +import com.google.devtools.build.lib.query2.engine.QueryUtil.Processor; import java.util.List; import java.util.Set; import java.util.concurrent.ForkJoinPool; @@ -51,24 +52,46 @@ public class VisibleFunction implements QueryFunction { return ImmutableList.of(ArgumentType.EXPRESSION, ArgumentType.EXPRESSION); } + private static class ProcessorImpl<T> implements Processor<T> { + private final QueryEnvironment<T> env; + private final Set<T> toSet; + + private ProcessorImpl(QueryEnvironment<T> env, Set<T> toSet) { + this.env = env; + this.toSet = toSet; + } + + @Override + public void process(Iterable<T> partialResult, Callback<T> callback) + throws QueryException, InterruptedException { + for (T t : partialResult) { + if (visibleToAll(env, toSet, t)) { + callback.process(ImmutableList.of(t)); + } + } + } + } + + private static <T> void doEval( + QueryEnvironment<T> env, + VariableContext<T> context, + List<Argument> args, + Callback<T> callback) throws QueryException, InterruptedException { + Set<T> toSet = QueryUtil.evalAll(env, context, args.get(0).getExpression()); + env.eval( + args.get(1).getExpression(), + context, + QueryUtil.compose(new ProcessorImpl<T>(env, toSet), callback)); + } + @Override public <T> void eval( - final QueryEnvironment<T> env, + QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expression, List<Argument> args, - final Callback<T> callback) throws QueryException, InterruptedException { - final Set<T> toSet = QueryUtil.evalAll(env, context, args.get(0).getExpression()); - env.eval(args.get(1).getExpression(), context, new Callback<T>() { - @Override - public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { - for (T t : partialResult) { - if (visibleToAll(env, toSet, t)) { - callback.process(ImmutableList.of(t)); - } - } - } - }); + Callback<T> callback) throws QueryException, InterruptedException { + doEval(env, context, args, callback); } @Override @@ -79,7 +102,7 @@ public class VisibleFunction implements QueryFunction { List<Argument> args, ThreadSafeCallback<T> callback, ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - eval(env, context, expression, args, callback); + doEval(env, context, args, callback); } /** Returns true if {@code target} is visible to all targets in {@code toSet}. */ |