diff options
author | 2017-02-27 21:55:56 +0000 | |
---|---|---|
committer | 2017-02-28 11:33:08 +0000 | |
commit | 822c37816ac669e51bec3853b41849a19ec5e230 (patch) | |
tree | a12e1f438342aa9ec1846089fc255bf2abb18ad3 /src/main/java | |
parent | fb64609c3f1d3492f4d80807f5d91894fa147172 (diff) |
Reimplement blaze query using an async evaluation model. Use a concurrent backend for SkyQueryEnvironment's implementation in order to achieve parallelism.
Advantages:
-New design has no flaws that the old design had.
-Code is structured so that deadlocks due to thread starvation are impossible (yup!).
Disadvantages:
-The meat of this change needs to all be in a single CL because every single QueryFunction and QueryExpression needs to be rewritten in the async style.
Still TODO:
-Fully embrace the async model in all QueryFunctions (e.g. 'rdeps', 'allpaths').
-Use concurrency in BlazeQueryEnvironment to achieve parallel evaluation for (non SkyQuery) 'blaze query' and genquery.
--
PiperOrigin-RevId: 148690279
MOS_MIGRATED_REVID=148690279
Diffstat (limited to 'src/main/java')
47 files changed, 1316 insertions, 1274 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/bazel/commands/FetchCommand.java b/src/main/java/com/google/devtools/build/lib/bazel/commands/FetchCommand.java index c0fce7a8c4..b4674481d0 100644 --- a/src/main/java/com/google/devtools/build/lib/bazel/commands/FetchCommand.java +++ b/src/main/java/com/google/devtools/build/lib/bazel/commands/FetchCommand.java @@ -21,10 +21,10 @@ import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.pkgcache.PackageCacheOptions; import com.google.devtools.build.lib.query2.AbstractBlazeQueryEnvironment; -import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting; import com.google.devtools.build.lib.query2.engine.QueryException; import com.google.devtools.build.lib.query2.engine.QueryExpression; +import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback; import com.google.devtools.build.lib.runtime.BlazeCommand; import com.google.devtools.build.lib.runtime.BlazeRuntime; import com.google.devtools.build.lib.runtime.Command; @@ -109,7 +109,7 @@ public final class FetchCommand implements BlazeCommand { // 2. Evaluate expression: try { - queryEnv.evaluateQuery(expr, new OutputFormatterCallback<Target>() { + queryEnv.evaluateQuery(expr, new ThreadSafeOutputFormatterCallback<Target>() { @Override public void processOutput(Iterable<Target> partialResult) { // Throw away the result. diff --git a/src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java b/src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java index e997b9f78d..88f891c1f3 100644 --- a/src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java +++ b/src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java @@ -19,6 +19,9 @@ import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.cmdline.LabelValidator.BadLabelException; import com.google.devtools.build.lib.cmdline.LabelValidator.PackageAndTarget; import com.google.devtools.build.lib.util.BatchCallback; @@ -26,15 +29,12 @@ import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.util.StringUtilities; import com.google.devtools.build.lib.util.ThreadSafeBatchCallback; import com.google.devtools.build.lib.vfs.PathFragment; - import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Objects; -import java.util.concurrent.ForkJoinPool; import java.util.regex.Pattern; - import javax.annotation.concurrent.Immutable; /** @@ -157,17 +157,48 @@ public abstract class TargetPattern implements Serializable { throws TargetParsingException, E, InterruptedException; /** - * Same as {@link #eval}, but optionally making use of the given {@link ForkJoinPool} to achieve - * parallelism. + * Evaluates this {@link TargetPattern} synchronously, feeding the result to the given + * {@code callback}, and then returns an appropriate immediate {@link ListenableFuture}. + * + * <p>If the returned {@link ListenableFuture}'s {@link ListenableFuture#get} throws an + * {@link ExecutionException}, the cause will be an instance of either + * {@link TargetParsingException} or the given {@code exceptionClass}. + */ + public final <T, E extends Exception> ListenableFuture<Void> evalAdaptedForAsync( + TargetPatternResolver<T> resolver, + ImmutableSet<PathFragment> excludedSubdirectories, + ThreadSafeBatchCallback<T, E> callback, + Class<E> exceptionClass) { + try { + eval(resolver, excludedSubdirectories, callback, exceptionClass); + return Futures.immediateFuture(null); + } catch (TargetParsingException e) { + return Futures.immediateFailedFuture(e); + } catch (InterruptedException e) { + return Futures.immediateCancelledFuture(); + } catch (Exception e) { + if (exceptionClass.isInstance(e)) { + return Futures.immediateFailedFuture(exceptionClass.cast(e)); + } + throw new IllegalStateException(e); + } + } + + /** + * Returns a {@link ListenableFuture} representing the asynchronous evaluation of this + * {@link TargetPattern} that feeds the results to the given {@code callback}. + * + * <p>If the returned {@link ListenableFuture}'s {@link ListenableFuture#get} throws an + * {@link ExecutionException}, the cause will be an instance of either + * {@link TargetParsingException} or the given {@code exceptionClass}. */ - public <T, E extends Exception> void parEval( + public <T, E extends Exception> ListenableFuture<Void> evalAsync( TargetPatternResolver<T> resolver, ImmutableSet<PathFragment> excludedSubdirectories, ThreadSafeBatchCallback<T, E> callback, Class<E> exceptionClass, - ForkJoinPool forkJoinPool) - throws TargetParsingException, E, InterruptedException { - eval(resolver, excludedSubdirectories, callback, exceptionClass); + ListeningExecutorService executor) { + return evalAdaptedForAsync(resolver, excludedSubdirectories, callback, exceptionClass); } /** @@ -252,8 +283,8 @@ public abstract class TargetPattern implements Serializable { public <T, E extends Exception> void eval( TargetPatternResolver<T> resolver, ImmutableSet<PathFragment> excludedSubdirectories, - BatchCallback<T, E> callback, Class<E> exceptionClass) - throws TargetParsingException, E, InterruptedException { + BatchCallback<T, E> callback, + Class<E> exceptionClass) throws TargetParsingException, E, InterruptedException { Preconditions.checkArgument(excludedSubdirectories.isEmpty(), "Target pattern \"%s\" of type %s cannot be evaluated with excluded subdirectories: %s.", getOriginalPattern(), getType(), excludedSubdirectories); @@ -518,14 +549,13 @@ public abstract class TargetPattern implements Serializable { } @Override - public <T, E extends Exception> void parEval( + public <T, E extends Exception> ListenableFuture<Void> evalAsync( TargetPatternResolver<T> resolver, ImmutableSet<PathFragment> excludedSubdirectories, ThreadSafeBatchCallback<T, E> callback, Class<E> exceptionClass, - ForkJoinPool forkJoinPool) - throws TargetParsingException, E, InterruptedException { - resolver.findTargetsBeneathDirectoryPar( + ListeningExecutorService executor) { + return resolver.findTargetsBeneathDirectoryAsync( directory.getRepository(), getOriginalPattern(), directory.getPackageFragment().getPathString(), @@ -533,7 +563,7 @@ public abstract class TargetPattern implements Serializable { excludedSubdirectories, callback, exceptionClass, - forkJoinPool); + executor); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java index b6b384c882..38b866b68e 100644 --- a/src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java +++ b/src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java @@ -15,35 +15,38 @@ package com.google.devtools.build.lib.cmdline; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.util.BatchCallback; import com.google.devtools.build.lib.util.ThreadSafeBatchCallback; import com.google.devtools.build.lib.vfs.PathFragment; import java.util.concurrent.ForkJoinPool; /** - * A callback interface that is used during the process of converting target patterns (such as + * A callback that is used during the process of converting target patterns (such as * <code>//foo:all</code>) into one or more lists of targets (such as <code>//foo:foo, * //foo:bar</code>). During a call to {@link TargetPattern#eval}, the {@link TargetPattern} makes * calls to this interface to implement the target pattern semantics. The generic type {@code T} is * only for compile-time type safety; there are no requirements to the actual type. */ -public interface TargetPatternResolver<T> { +public abstract class TargetPatternResolver<T> { /** * Reports the given warning. */ - void warn(String msg); + public abstract void warn(String msg); /** * Returns a single target corresponding to the given label, or null. This method may only throw * an exception if the current thread was interrupted. */ - T getTargetOrNull(Label label) throws InterruptedException; + public abstract T getTargetOrNull(Label label) throws InterruptedException; /** * Returns a single target corresponding to the given label, or an empty or failed result. */ - ResolvedTargets<T> getExplicitTarget(Label label) + public abstract ResolvedTargets<T> getExplicitTarget(Label label) throws TargetParsingException, InterruptedException; /** @@ -55,7 +58,7 @@ public interface TargetPatternResolver<T> { * @param packageIdentifier the identifier of the package * @param rulesOnly whether to return rules only */ - ResolvedTargets<T> getTargetsInPackage(String originalPattern, + public abstract ResolvedTargets<T> getTargetsInPackage(String originalPattern, PackageIdentifier packageIdentifier, boolean rulesOnly) throws TargetParsingException, InterruptedException; @@ -84,7 +87,7 @@ public interface TargetPatternResolver<T> { * @param exceptionClass The class type of the parameterized exception. * @throws TargetParsingException under implementation-specific failure conditions */ - <E extends Exception> void findTargetsBeneathDirectory( + public abstract <E extends Exception> void findTargetsBeneathDirectory( RepositoryName repository, String originalPattern, String directory, @@ -98,7 +101,7 @@ public interface TargetPatternResolver<T> { * Same as {@link #findTargetsBeneathDirectory}, but optionally making use of the given * {@link ForkJoinPool} to achieve parallelism. */ - <E extends Exception> void findTargetsBeneathDirectoryPar( + public <E extends Exception> ListenableFuture<Void> findTargetsBeneathDirectoryAsync( RepositoryName repository, String originalPattern, String directory, @@ -106,19 +109,38 @@ public interface TargetPatternResolver<T> { ImmutableSet<PathFragment> excludedSubdirectories, ThreadSafeBatchCallback<T, E> callback, Class<E> exceptionClass, - ForkJoinPool forkJoinPool) - throws TargetParsingException, E, InterruptedException; + ListeningExecutorService executor) { + try { + findTargetsBeneathDirectory( + repository, + originalPattern, + directory, + rulesOnly, + excludedSubdirectories, + callback, + exceptionClass); + return Futures.immediateFuture(null); + } catch (TargetParsingException e) { + return Futures.immediateFailedFuture(e); + } catch (InterruptedException e) { + return Futures.immediateCancelledFuture(); + } catch (Exception e) { + if (exceptionClass.isInstance(e)) { + return Futures.immediateFailedFuture(e); + } + throw new IllegalStateException(e); + } + } /** * Returns true, if and only if the given package identifier corresponds to a package, i.e., a - * file with the name {@code packageName/BUILD} exists in the appropriat repository. + * file with the name {@code packageName/BUILD} exists in the appropriate repository. */ - boolean isPackage(PackageIdentifier packageIdentifier) throws InterruptedException; + public abstract boolean isPackage(PackageIdentifier packageIdentifier) + throws InterruptedException; /** * Returns the target kind of the given target, for example {@code cc_library rule}. */ - String getTargetKind(T target); - - + public abstract String getTargetKind(T target); } diff --git a/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java index 644adcf93d..b0a18cccc9 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java @@ -24,15 +24,15 @@ import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.ExtendedEventHandler; import com.google.devtools.build.lib.packages.DependencyFilter; import com.google.devtools.build.lib.packages.Target; +import com.google.devtools.build.lib.query2.engine.AbstractQueryEnvironment; import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback; import com.google.devtools.build.lib.query2.engine.QueryEnvironment; import com.google.devtools.build.lib.query2.engine.QueryEvalResult; import com.google.devtools.build.lib.query2.engine.QueryException; import com.google.devtools.build.lib.query2.engine.QueryExpression; -import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener; import com.google.devtools.build.lib.query2.engine.QueryUtil; import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback; -import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback; +import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback; import com.google.devtools.build.lib.query2.engine.VariableContext; import com.google.devtools.build.lib.util.Preconditions; import java.io.IOException; @@ -47,8 +47,7 @@ import java.util.logging.Logger; * {@link QueryEnvironment} that can evaluate queries to produce a result, and implements as much of * QueryEnvironment as possible while remaining mostly agnostic as to the objects being stored. */ -public abstract class AbstractBlazeQueryEnvironment<T> - implements QueryEnvironment<T> { +public abstract class AbstractBlazeQueryEnvironment<T> extends AbstractQueryEnvironment<T> { protected ErrorSensingEventHandler eventHandler; protected final boolean keepGoing; protected final boolean strictScope; @@ -58,7 +57,6 @@ public abstract class AbstractBlazeQueryEnvironment<T> protected final Set<Setting> settings; protected final List<QueryFunction> extraFunctions; - private final QueryExpressionEvalListener<T> evalListener; private static final Logger logger = Logger.getLogger(AbstractBlazeQueryEnvironment.class.getName()); @@ -69,8 +67,7 @@ public abstract class AbstractBlazeQueryEnvironment<T> Predicate<Label> labelFilter, ExtendedEventHandler eventHandler, Set<Setting> settings, - Iterable<QueryFunction> extraFunctions, - QueryExpressionEvalListener<T> evalListener) { + Iterable<QueryFunction> extraFunctions) { this.eventHandler = new ErrorSensingEventHandler(eventHandler); this.keepGoing = keepGoing; this.strictScope = strictScope; @@ -78,7 +75,6 @@ public abstract class AbstractBlazeQueryEnvironment<T> this.labelFilter = labelFilter; this.settings = Sets.immutableEnumSet(settings); this.extraFunctions = ImmutableList.copyOf(extraFunctions); - this.evalListener = evalListener; } private static DependencyFilter constructDependencyFilter( @@ -103,7 +99,7 @@ public abstract class AbstractBlazeQueryEnvironment<T> */ protected void evalTopLevelInternal(QueryExpression expr, OutputFormatterCallback<T> callback) throws QueryException, InterruptedException { - eval(expr, VariableContext.<T>empty(), callback); + ((QueryTaskFutureImpl<Void>) eval(expr, VariableContext.<T>empty(), callback)).getChecked(); } /** @@ -120,9 +116,9 @@ public abstract class AbstractBlazeQueryEnvironment<T> */ public QueryEvalResult evaluateQuery( QueryExpression expr, - final OutputFormatterCallback<T> callback) + ThreadSafeOutputFormatterCallback<T> callback) throws QueryException, InterruptedException, IOException { - EmptinessSensingCallback<T> emptySensingCallback = createEmptinessSensingCallback(callback); + EmptinessSensingCallback<T> emptySensingCallback = new EmptinessSensingCallback<>(callback); long startTime = System.currentTimeMillis(); // In the --nokeep_going case, errors are reported in the order in which the patterns are // specified; using a linked hash set here makes sure that the left-most error is reported. @@ -175,13 +171,6 @@ public abstract class AbstractBlazeQueryEnvironment<T> return new QueryEvalResult(!eventHandler.hasErrors(), emptySensingCallback.isEmpty()); } - private static <T> EmptinessSensingCallback<T> createEmptinessSensingCallback( - OutputFormatterCallback<T> callback) { - return (callback instanceof ThreadSafeCallback) - ? new ThreadSafeEmptinessSensingCallback<>(callback) - : new EmptinessSensingCallback<>(callback); - } - private static class EmptinessSensingCallback<T> extends OutputFormatterCallback<T> { private final OutputFormatterCallback<T> callback; private final AtomicBoolean empty = new AtomicBoolean(true); @@ -212,19 +201,11 @@ public abstract class AbstractBlazeQueryEnvironment<T> } } - private static class ThreadSafeEmptinessSensingCallback<T> - extends EmptinessSensingCallback<T> implements ThreadSafeCallback<T> { - private ThreadSafeEmptinessSensingCallback(OutputFormatterCallback<T> callback) { - super(callback); - Preconditions.checkState(callback instanceof ThreadSafeCallback); - } - } - public QueryExpression transformParsedQuery(QueryExpression queryExpression) { return queryExpression; } - public QueryEvalResult evaluateQuery(String query, OutputFormatterCallback<T> callback) + public QueryEvalResult evaluateQuery(String query, ThreadSafeOutputFormatterCallback<T> callback) throws QueryException, InterruptedException, IOException { return evaluateQuery( QueryExpression.parse(query, this), callback); @@ -256,17 +237,32 @@ public abstract class AbstractBlazeQueryEnvironment<T> return true; } - public Set<T> evalTargetPattern(QueryExpression caller, String pattern) - throws QueryException, InterruptedException { + public QueryTaskFuture<Set<T>> evalTargetPattern(QueryExpression caller, String pattern) { try { preloadOrThrow(caller, ImmutableList.of(pattern)); - } catch (TargetParsingException e) { - // Will skip the target and keep going if -k is specified. - reportBuildFileError(caller, e.getMessage()); + } catch (TargetParsingException tpe) { + try { + // Will skip the target and keep going if -k is specified. + reportBuildFileError(caller, tpe.getMessage()); + } catch (QueryException qe) { + return immediateFailedFuture(qe); + } + } catch (QueryException qe) { + return immediateFailedFuture(qe); + } catch (InterruptedException e) { + return immediateCancelledFuture(); } - AggregateAllCallback<T> aggregatingCallback = QueryUtil.newAggregateAllCallback(); - getTargetsMatchingPattern(caller, pattern, aggregatingCallback); - return aggregatingCallback.getResult(); + final AggregateAllCallback<T> aggregatingCallback = QueryUtil.newAggregateAllCallback(); + QueryTaskFuture<Void> evalFuture = + getTargetsMatchingPattern(caller, pattern, aggregatingCallback); + return whenSucceedsCall( + evalFuture, + new QueryTaskCallable<Set<T>>() { + @Override + public Set<T> call() { + return aggregatingCallback.getResult(); + } + }); } /** @@ -289,9 +285,4 @@ public abstract class AbstractBlazeQueryEnvironment<T> builder.addAll(extraFunctions); return builder.build(); } - - @Override - public QueryExpressionEvalListener<T> getEvalListener() { - return evalListener; - } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java index 867ec3d8bf..de151aba0b 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java @@ -37,18 +37,13 @@ import com.google.devtools.build.lib.pkgcache.TargetProvider; import com.google.devtools.build.lib.pkgcache.TransitivePackageLoader; import com.google.devtools.build.lib.query2.engine.Callback; import com.google.devtools.build.lib.query2.engine.DigraphQueryEvalResult; -import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback; import com.google.devtools.build.lib.query2.engine.QueryEvalResult; import com.google.devtools.build.lib.query2.engine.QueryException; import com.google.devtools.build.lib.query2.engine.QueryExpression; -import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener; -import com.google.devtools.build.lib.query2.engine.QueryUtil; import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractUniquifier; -import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback; import com.google.devtools.build.lib.query2.engine.SkyframeRestartQueryException; -import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback; +import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback; import com.google.devtools.build.lib.query2.engine.Uniquifier; -import com.google.devtools.build.lib.query2.engine.VariableContext; import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.vfs.PathFragment; import java.io.IOException; @@ -61,7 +56,6 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ForkJoinPool; /** * The environment of a Blaze query. Not thread-safe. @@ -108,10 +102,8 @@ public class BlazeQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> Predicate<Label> labelFilter, ExtendedEventHandler eventHandler, Set<Setting> settings, - Iterable<QueryFunction> extraFunctions, - QueryExpressionEvalListener<Target> evalListener) { - super( - keepGoing, strictScope, labelFilter, eventHandler, settings, extraFunctions, evalListener); + Iterable<QueryFunction> extraFunctions) { + super(keepGoing, strictScope, labelFilter, eventHandler, settings, extraFunctions); this.targetPatternEvaluator = targetPatternEvaluator; this.transitivePackageLoader = transitivePackageLoader; this.targetProvider = targetProvider; @@ -123,7 +115,7 @@ public class BlazeQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> @Override public DigraphQueryEvalResult<Target> evaluateQuery( QueryExpression expr, - final OutputFormatterCallback<Target> callback) + ThreadSafeOutputFormatterCallback<Target> callback) throws QueryException, InterruptedException, IOException { eventHandler.resetErrors(); resolvedTargetPatterns.clear(); @@ -133,8 +125,19 @@ public class BlazeQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> } @Override - public void getTargetsMatchingPattern( - QueryExpression caller, String pattern, Callback<Target> callback) + public QueryTaskFuture<Void> getTargetsMatchingPattern( + QueryExpression owner, String pattern, Callback<Target> callback) { + try { + getTargetsMatchingPatternImpl(pattern, callback); + return immediateSuccessfulFuture(null); + } catch (QueryException e) { + return immediateFailedFuture(e); + } catch (InterruptedException e) { + return immediateCancelledFuture(); + } + } + + private void getTargetsMatchingPatternImpl(String pattern, Callback<Target> callback) throws QueryException, InterruptedException { // We can safely ignore the boolean error flag. The evaluateQuery() method above wraps the // entire query computation in an error sensor. @@ -192,15 +195,6 @@ public class BlazeQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> } @Override - public void getTargetsMatchingPatternPar( - QueryExpression caller, - String pattern, - ThreadSafeCallback<Target> callback, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - getTargetsMatchingPattern(caller, pattern, callback); - } - - @Override public Target getTarget(Label label) throws TargetNotFoundException, QueryException, InterruptedException { // Can't use strictScope here because we are expecting a target back. @@ -293,14 +287,6 @@ public class BlazeQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> } @Override - public void eval(QueryExpression expr, VariableContext<Target> context, Callback<Target> callback) - throws QueryException, InterruptedException { - AggregateAllCallback<Target> aggregator = QueryUtil.newAggregateAllCallback(); - expr.eval(this, context, aggregator); - callback.process(aggregator.getResult()); - } - - @Override public Uniquifier<Target> createUniquifier() { return new AbstractUniquifier<Target, Label>() { @Override diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java index 30a27f8e63..c3e632ebb4 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java @@ -36,10 +36,10 @@ import com.google.devtools.build.lib.concurrent.QuiescingExecutor; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.query2.engine.Callback; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import com.google.devtools.build.lib.query2.engine.QueryException; import com.google.devtools.build.lib.query2.engine.QueryExpression; -import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback; -import com.google.devtools.build.lib.query2.engine.ThreadSafeUniquifier; +import com.google.devtools.build.lib.query2.engine.Uniquifier; import com.google.devtools.build.lib.query2.engine.VariableContext; import com.google.devtools.build.lib.skyframe.PackageValue; import com.google.devtools.build.lib.skyframe.SkyFunctions; @@ -77,14 +77,13 @@ class ParallelSkyQueryUtils { * Specialized parallel variant of {@link SkyQueryEnvironment#getAllRdeps} that is appropriate * when there is no depth-bound. */ - static void getAllRdepsUnboundedParallel( + static QueryTaskFuture<Void> getAllRdepsUnboundedParallel( SkyQueryEnvironment env, QueryExpression expression, VariableContext<Target> context, - ThreadSafeCallback<Target> callback, - MultisetSemaphore<PackageIdentifier> packageSemaphore) - throws QueryException, InterruptedException { - env.eval( + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore) { + return env.eval( expression, context, new SkyKeyBFSVisitorCallback( @@ -95,10 +94,10 @@ class ParallelSkyQueryUtils { static void getRBuildFilesParallel( SkyQueryEnvironment env, Collection<PathFragment> fileIdentifiers, - ThreadSafeCallback<Target> callback, + Callback<Target> callback, MultisetSemaphore<PackageIdentifier> packageSemaphore) throws QueryException, InterruptedException { - ThreadSafeUniquifier<SkyKey> keyUniquifier = env.createSkyKeyUniquifier(); + Uniquifier<SkyKey> keyUniquifier = env.createSkyKeyUniquifier(); RBuildFilesVisitor visitor = new RBuildFilesVisitor(env, keyUniquifier, callback, packageSemaphore); visitor.visitAndWaitForCompletion(env.getSkyKeysForFileFragments(fileIdentifiers)); @@ -110,7 +109,7 @@ class ParallelSkyQueryUtils { private RBuildFilesVisitor( SkyQueryEnvironment env, - ThreadSafeUniquifier<SkyKey> uniquifier, + Uniquifier<SkyKey> uniquifier, Callback<Target> callback, MultisetSemaphore<PackageIdentifier> packageSemaphore) { super(env, uniquifier, callback); @@ -180,8 +179,8 @@ class ParallelSkyQueryUtils { private AllRdepsUnboundedVisitor( SkyQueryEnvironment env, - ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier, - ThreadSafeCallback<Target> callback, + Uniquifier<Pair<SkyKey, SkyKey>> uniquifier, + Callback<Target> callback, MultisetSemaphore<PackageIdentifier> packageSemaphore) { super(env, uniquifier, callback); this.packageSemaphore = packageSemaphore; @@ -190,19 +189,18 @@ class ParallelSkyQueryUtils { /** * A {@link Factory} for {@link AllRdepsUnboundedVisitor} instances, each of which will be used * to perform visitation of the reverse transitive closure of the {@link Target}s passed in a - * single {@link ThreadSafeCallback#process} call. Note that all the created - * instances share the same {@code ThreadSafeUniquifier<SkyKey>} so that we don't visit the - * same Skyframe node more than once. + * single {@link Callback#process} call. Note that all the created instances share the same + * {@link Uniquifier} so that we don't visit the same Skyframe node more than once. */ private static class Factory implements AbstractSkyKeyBFSVisitor.Factory { private final SkyQueryEnvironment env; - private final ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier; - private final ThreadSafeCallback<Target> callback; + private final Uniquifier<Pair<SkyKey, SkyKey>> uniquifier; + private final Callback<Target> callback; private final MultisetSemaphore<PackageIdentifier> packageSemaphore; private Factory( SkyQueryEnvironment env, - ThreadSafeCallback<Target> callback, + Callback<Target> callback, MultisetSemaphore<PackageIdentifier> packageSemaphore) { this.env = env; this.uniquifier = env.createReverseDepSkyKeyUniquifier(); @@ -341,10 +339,10 @@ class ParallelSkyQueryUtils { } /** - * A {@link ThreadSafeCallback} whose {@link ThreadSafeCallback#process} method kicks off a BFS - * visitation via a fresh {@link AbstractSkyKeyBFSVisitor} instance. + * A {@link Callback} whose {@link Callback#process} method kicks off a BFS visitation via a fresh + * {@link AbstractSkyKeyBFSVisitor} instance. */ - private static class SkyKeyBFSVisitorCallback implements ThreadSafeCallback<Target> { + private static class SkyKeyBFSVisitorCallback implements Callback<Target> { private final AbstractSkyKeyBFSVisitor.Factory visitorFactory; private SkyKeyBFSVisitorCallback(AbstractSkyKeyBFSVisitor.Factory visitorFactory) { @@ -355,6 +353,8 @@ class ParallelSkyQueryUtils { public void process(Iterable<Target> partialResult) throws QueryException, InterruptedException { AbstractSkyKeyBFSVisitor<?> visitor = visitorFactory.create(); + // TODO(nharmata): It's not ideal to have an operation like this in #process that blocks on + // another, potentially expensive computation. Refactor to something like "processAsync". visitor.visitAndWaitForCompletion( SkyQueryEnvironment.makeTransitiveTraversalKeysStrict(partialResult)); } @@ -370,7 +370,7 @@ class ParallelSkyQueryUtils { @ThreadSafe private abstract static class AbstractSkyKeyBFSVisitor<T> { protected final SkyQueryEnvironment env; - private final ThreadSafeUniquifier<T> uniquifier; + private final Uniquifier<T> uniquifier; private final Callback<Target> callback; private final QuiescingExecutor executor; @@ -434,7 +434,7 @@ class ParallelSkyQueryUtils { new ThreadFactoryBuilder().setNameFormat("skykey-bfs-visitor %d").build()); private AbstractSkyKeyBFSVisitor( - SkyQueryEnvironment env, ThreadSafeUniquifier<T> uniquifier, Callback<Target> callback) { + SkyQueryEnvironment env, Uniquifier<T> uniquifier, Callback<Target> callback) { this.env = env; this.uniquifier = uniquifier; this.callback = callback; diff --git a/src/main/java/com/google/devtools/build/lib/query2/QueryEnvironmentFactory.java b/src/main/java/com/google/devtools/build/lib/query2/QueryEnvironmentFactory.java index f8c8327627..d43f71beb2 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/QueryEnvironmentFactory.java +++ b/src/main/java/com/google/devtools/build/lib/query2/QueryEnvironmentFactory.java @@ -24,7 +24,6 @@ import com.google.devtools.build.lib.pkgcache.TargetProvider; import com.google.devtools.build.lib.pkgcache.TransitivePackageLoader; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting; -import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener; import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.skyframe.WalkableGraph.WalkableGraphFactory; import java.util.List; @@ -48,7 +47,6 @@ public class QueryEnvironmentFactory { ExtendedEventHandler eventHandler, Set<Setting> settings, Iterable<QueryFunction> functions, - QueryExpressionEvalListener<Target> evalListener, @Nullable PathPackageLocator packagePath) { Preconditions.checkNotNull(universeScope); if (canUseSkyQuery(orderedResults, universeScope, packagePath, strictScope, labelFilter)) { @@ -58,7 +56,6 @@ public class QueryEnvironmentFactory { eventHandler, settings, functions, - evalListener, targetPatternEvaluator.getOffset(), graphFactory, universeScope, @@ -66,7 +63,7 @@ public class QueryEnvironmentFactory { } else { return new BlazeQueryEnvironment(transitivePackageLoader, targetProvider, targetPatternEvaluator, keepGoing, strictScope, loadingPhaseThreads, labelFilter, - eventHandler, settings, functions, evalListener); + eventHandler, settings, functions); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java index 79614c24a5..e2fb0e4dfd 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java @@ -22,14 +22,12 @@ import com.google.devtools.build.lib.query2.engine.QueryEnvironment; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import com.google.devtools.build.lib.query2.engine.QueryException; import com.google.devtools.build.lib.query2.engine.QueryExpression; -import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback; import com.google.devtools.build.lib.query2.engine.VariableContext; import com.google.devtools.build.lib.vfs.PathFragment; - import java.util.List; -import java.util.concurrent.ForkJoinPool; /** * An "rbuildfiles" query expression, which computes the set of packages (as represented by their @@ -69,36 +67,19 @@ public class RBuildFilesFunction implements QueryFunction { @Override @SuppressWarnings("unchecked") // Cast from <Target> to <T>. This will only be used with <Target>. - public <T> void eval( - QueryEnvironment<T> env, - VariableContext<T> context, - QueryExpression expression, - List<Argument> args, - Callback<T> callback) throws QueryException, InterruptedException { - if (!(env instanceof SkyQueryEnvironment)) { - throw new QueryException("rbuildfiles can only be used with SkyQueryEnvironment"); - } - ((SkyQueryEnvironment) env) - .getRBuildFiles( - Collections2.transform(args, ARGUMENT_TO_PATH_FRAGMENT), (Callback<Target>) callback); - } - - @SuppressWarnings("unchecked") - @Override - public <T> void parEval( + public <T> QueryTaskFuture<Void> eval( QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expression, List<Argument> args, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { + Callback<T> callback) { if (!(env instanceof SkyQueryEnvironment)) { - throw new QueryException("rbuildfiles can only be used with SkyQueryEnvironment"); + return env.immediateFailedFuture( + new QueryException("rbuildfiles can only be used with SkyQueryEnvironment")); } - ((SkyQueryEnvironment) env) - .getRBuildFilesParallel( - Collections2.transform(args, ARGUMENT_TO_PATH_FRAGMENT), - (ThreadSafeCallback<Target>) callback, - forkJoinPool); + SkyQueryEnvironment skyEnv = ((SkyQueryEnvironment) env); + return skyEnv.getRBuildFilesParallel( + Collections2.transform(args, ARGUMENT_TO_PATH_FRAGMENT), + (Callback<Target>) callback); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index 3e1410f582..565b6ae519 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -29,14 +29,20 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.LabelSyntaxException; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.cmdline.TargetParsingException; import com.google.devtools.build.lib.cmdline.TargetPattern; import com.google.devtools.build.lib.collect.CompactHashSet; +import com.google.devtools.build.lib.concurrent.BlockingStack; import com.google.devtools.build.lib.concurrent.MultisetSemaphore; -import com.google.devtools.build.lib.concurrent.NamedForkJoinPool; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.ExtendedEventHandler; @@ -56,17 +62,17 @@ import com.google.devtools.build.lib.query2.engine.AllRdepsFunction; import com.google.devtools.build.lib.query2.engine.Callback; import com.google.devtools.build.lib.query2.engine.FunctionExpression; import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import com.google.devtools.build.lib.query2.engine.QueryEvalResult; import com.google.devtools.build.lib.query2.engine.QueryException; import com.google.devtools.build.lib.query2.engine.QueryExpression; -import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener; import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper; -import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractThreadSafeUniquifier; +import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractUniquifier; import com.google.devtools.build.lib.query2.engine.RdepsFunction; import com.google.devtools.build.lib.query2.engine.StreamableQueryEnvironment; import com.google.devtools.build.lib.query2.engine.TargetLiteral; -import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback; -import com.google.devtools.build.lib.query2.engine.ThreadSafeUniquifier; +import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback; import com.google.devtools.build.lib.query2.engine.Uniquifier; import com.google.devtools.build.lib.query2.engine.VariableContext; import com.google.devtools.build.lib.skyframe.BlacklistedPackagePrefixesValue; @@ -106,7 +112,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Queue; import java.util.Set; -import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -117,6 +125,10 @@ import javax.annotation.Nullable; * reverse edges. Results obtained by calling {@link #evaluateQuery} are not guaranteed to be in any * particular order. As well, this class eagerly loads the full transitive closure of targets, even * if the full closure isn't needed. + * + * <p>This class has concurrent implementations of the + * {@link QueryTaskFuture}/{@link QueryTaskCallable} helper methods. The combination of this and the + * asynchronous evaluation model yields parallel query evaluation. */ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> implements StreamableQueryEnvironment<Target> { @@ -140,7 +152,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> protected WalkableGraph graph; private InterruptibleSupplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier; private GraphBackedRecursivePackageProvider graphBackedRecursivePackageProvider; - private ForkJoinPool forkJoinPool; + private ListeningExecutorService executor; private RecursivePackageProviderBackedTargetPatternResolver resolver; private final SkyKey universeKey; private final ImmutableList<TargetPatternKey> universeTargetPatternKeys; @@ -151,7 +163,6 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> ExtendedEventHandler eventHandler, Set<Setting> settings, Iterable<QueryFunction> extraFunctions, - QueryExpressionEvalListener<Target> evalListener, String parserPrefix, WalkableGraphFactory graphFactory, List<String> universeScope, @@ -165,7 +176,6 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> eventHandler, settings, extraFunctions, - evalListener, parserPrefix, graphFactory, universeScope, @@ -179,7 +189,6 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> ExtendedEventHandler eventHandler, Set<Setting> settings, Iterable<QueryFunction> extraFunctions, - QueryExpressionEvalListener<Target> evalListener, String parserPrefix, WalkableGraphFactory graphFactory, List<String> universeScope, @@ -190,8 +199,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> /*labelFilter=*/ Rule.ALL_LABELS, eventHandler, settings, - extraFunctions, - evalListener); + extraFunctions); this.loadingPhaseThreads = loadingPhaseThreads; this.graphFactory = graphFactory; this.pkgPath = pkgPath; @@ -224,9 +232,15 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> graphBackedRecursivePackageProvider = new GraphBackedRecursivePackageProvider(graph, universeTargetPatternKeys, pkgPath); } - if (forkJoinPool == null) { - forkJoinPool = - NamedForkJoinPool.newNamedPool("QueryEnvironment", queryEvaluationParallelismLevel); + if (executor == null) { + executor = MoreExecutors.listeningDecorator( + new ThreadPoolExecutor( + /*corePoolSize=*/ queryEvaluationParallelismLevel, + /*maximumPoolSize=*/ queryEvaluationParallelismLevel, + /*keepAliveTime=*/ 1, + /*units=*/ TimeUnit.SECONDS, + /*workQueue=*/ new BlockingStack<Runnable>(), + new ThreadFactoryBuilder().setNameFormat("QueryEnvironment %d").build())); } resolver = new RecursivePackageProviderBackedTargetPatternResolver( @@ -336,16 +350,17 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> } catch (Throwable throwable) { throwableToThrow = throwable; } finally { - if (throwableToThrow != null) { - LOG.log(Level.INFO, "About to shutdown FJP because of throwable", throwableToThrow); + if (throwableToThrow != null) { + LOG.log( + Level.INFO, + "About to shutdown query threadpool because of throwable", + throwableToThrow); // Force termination of remaining tasks if evaluation failed abruptly (e.g. was // interrupted). We don't want to leave any dangling threads running tasks. - forkJoinPool.shutdownNow(); - } - forkJoinPool.awaitQuiescence(Long.MAX_VALUE, TimeUnit.MILLISECONDS); - if (throwableToThrow != null) { - // Signal that pool must be recreated on the next invocation. - forkJoinPool = null; + executor.shutdownNow(); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); + // Signal that executor must be recreated on the next invocation. + executor = null; Throwables.propagateIfPossible( throwableToThrow, QueryException.class, InterruptedException.class); } @@ -354,7 +369,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> @Override public QueryEvalResult evaluateQuery( - QueryExpression expr, OutputFormatterCallback<Target> callback) + QueryExpression expr, ThreadSafeOutputFormatterCallback<Target> callback) throws QueryException, InterruptedException, IOException { // Some errors are reported as QueryExceptions and others as ERROR events (if --keep_going). The // result is set to have an error iff there were errors emitted during the query, so we reset @@ -565,38 +580,79 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> return null; } + private <R> ListenableFuture<R> safeSubmit(Callable<R> callable) { + try { + return executor.submit(callable); + } catch (RejectedExecutionException e) { + return Futures.immediateCancelledFuture(); + } + } + @ThreadSafe @Override - public void eval(QueryExpression expr, VariableContext<Target> context, Callback<Target> callback) - throws QueryException, InterruptedException { - // TODO(bazel-team): Refactor QueryEnvironment et al. such that this optimization is enabled for - // all QueryEnvironment implementations. - if (callback instanceof ThreadSafeCallback) { - expr.parEval(this, context, (ThreadSafeCallback<Target>) callback, forkJoinPool); - } else { - expr.eval(this, context, callback); - } + public QueryTaskFuture<Void> eval( + final QueryExpression expr, + final VariableContext<Target> context, + final Callback<Target> callback) { + // TODO(bazel-team): As in here, use concurrency for the async #eval of other QueryEnvironment + // implementations. + Callable<QueryTaskFutureImpl<Void>> task = new Callable<QueryTaskFutureImpl<Void>>() { + @Override + public QueryTaskFutureImpl<Void> call() { + return (QueryTaskFutureImpl<Void>) expr.eval(SkyQueryEnvironment.this, context, callback); + } + }; + ListenableFuture<QueryTaskFutureImpl<Void>> futureFuture = safeSubmit(task); + return QueryTaskFutureImpl.ofDelegate(Futures.dereference(futureFuture)); + } + + @Override + public <R> QueryTaskFuture<R> executeAsync(QueryTaskCallable<R> callable) { + return QueryTaskFutureImpl.ofDelegate(safeSubmit(callable)); + } + + @Override + public <T1, T2> QueryTaskFuture<T2> transformAsync( + QueryTaskFuture<T1> future, + final Function<T1, QueryTaskFuture<T2>> function) { + return QueryTaskFutureImpl.ofDelegate( + Futures.transformAsync( + (QueryTaskFutureImpl<T1>) future, + new AsyncFunction<T1, T2>() { + @Override + public ListenableFuture<T2> apply(T1 input) { + return (QueryTaskFutureImpl<T2>) function.apply(input); + } + }, + executor)); + } + + @Override + public <R> QueryTaskFuture<R> whenAllSucceedCall( + Iterable<? extends QueryTaskFuture<?>> futures, QueryTaskCallable<R> callable) { + return QueryTaskFutureImpl.ofDelegate( + Futures.whenAllSucceed(cast(futures)).call(callable, executor)); } @ThreadSafe @Override - public ThreadSafeUniquifier<Target> createUniquifier() { + public Uniquifier<Target> createUniquifier() { return createTargetUniquifier(); } @ThreadSafe - ThreadSafeUniquifier<Target> createTargetUniquifier() { - return new ThreadSafeTargetUniquifier(DEFAULT_THREAD_COUNT); + Uniquifier<Target> createTargetUniquifier() { + return new TargetUniquifier(DEFAULT_THREAD_COUNT); } @ThreadSafe - ThreadSafeUniquifier<SkyKey> createSkyKeyUniquifier() { - return new ThreadSafeSkyKeyUniquifier(DEFAULT_THREAD_COUNT); + Uniquifier<SkyKey> createSkyKeyUniquifier() { + return new SkyKeyUniquifier(DEFAULT_THREAD_COUNT); } @ThreadSafe - ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> createReverseDepSkyKeyUniquifier() { - return new ThreadSafeReverseDepSkyKeyUniquifier(DEFAULT_THREAD_COUNT); + Uniquifier<Pair<SkyKey, SkyKey>> createReverseDepSkyKeyUniquifier() { + return new ReverseDepSkyKeyUniquifier(DEFAULT_THREAD_COUNT); } private Pair<TargetPattern, ImmutableSet<PathFragment>> getPatternAndExcludes(String pattern) @@ -613,41 +669,44 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> @ThreadSafe @Override - public void getTargetsMatchingPattern( - QueryExpression owner, String pattern, Callback<Target> callback) - throws QueryException, InterruptedException { + public QueryTaskFuture<Void> getTargetsMatchingPattern( + final QueryExpression owner, String pattern, Callback<Target> callback) { // Directly evaluate the target pattern, making use of packages in the graph. + Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude; try { - Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude = - getPatternAndExcludes(pattern); - TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst(); - ImmutableSet<PathFragment> subdirectoriesToExclude = - patternToEvalAndSubdirectoriesToExclude.getSecond(); - patternToEval.eval(resolver, subdirectoriesToExclude, callback, QueryException.class); - } catch (TargetParsingException e) { - reportBuildFileError(owner, e.getMessage()); - } - } - - @Override - public void getTargetsMatchingPatternPar( - QueryExpression owner, - String pattern, - ThreadSafeCallback<Target> callback, - ForkJoinPool forkJoinPool) - throws QueryException, InterruptedException { - // Directly evaluate the target pattern, making use of packages in the graph. - try { - Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude = - getPatternAndExcludes(pattern); - TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst(); - ImmutableSet<PathFragment> subdirectoriesToExclude = - patternToEvalAndSubdirectoriesToExclude.getSecond(); - patternToEval.parEval( - resolver, subdirectoriesToExclude, callback, QueryException.class, forkJoinPool); - } catch (TargetParsingException e) { - reportBuildFileError(owner, e.getMessage()); + patternToEvalAndSubdirectoriesToExclude = getPatternAndExcludes(pattern); + } catch (TargetParsingException tpe) { + try { + reportBuildFileError(owner, tpe.getMessage()); + } catch (QueryException qe) { + return immediateFailedFuture(qe); + } + return immediateSuccessfulFuture(null); + } catch (InterruptedException ie) { + return immediateCancelledFuture(); } + TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst(); + ImmutableSet<PathFragment> subdirectoriesToExclude = + patternToEvalAndSubdirectoriesToExclude.getSecond(); + AsyncFunction<TargetParsingException, Void> reportBuildFileErrorAsyncFunction = + new AsyncFunction<TargetParsingException, Void>() { + @Override + public ListenableFuture<Void> apply(TargetParsingException exn) throws QueryException { + reportBuildFileError(owner, exn.getMessage()); + return Futures.immediateFuture(null); + } + }; + ListenableFuture<Void> evalFuture = patternToEval.evalAsync( + resolver, + subdirectoriesToExclude, + callback, + QueryException.class, + executor); + return QueryTaskFutureImpl.ofDelegate( + Futures.catchingAsync( + evalFuture, + TargetParsingException.class, + reportBuildFileErrorAsyncFunction)); } @ThreadSafe @@ -1018,12 +1077,17 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> } @ThreadSafe - void getRBuildFilesParallel( - Collection<PathFragment> fileIdentifiers, - ThreadSafeCallback<Target> callback, - ForkJoinPool forkJoinPool) - throws QueryException, InterruptedException { - ParallelSkyQueryUtils.getRBuildFilesParallel(this, fileIdentifiers, callback, packageSemaphore); + QueryTaskFuture<Void> getRBuildFilesParallel( + final Collection<PathFragment> fileIdentifiers, + final Callback<Target> callback) { + return QueryTaskFutureImpl.ofDelegate(safeSubmit(new Callable<Void>() { + @Override + public Void call() throws QueryException, InterruptedException { + ParallelSkyQueryUtils.getRBuildFilesParallel( + SkyQueryEnvironment.this, fileIdentifiers, callback, packageSemaphore); + return null; + } + })); } /** @@ -1031,41 +1095,50 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> * on the given list of BUILD files and subincludes (other files are filtered out). */ @ThreadSafe - void getRBuildFiles(Collection<PathFragment> fileIdentifiers, Callback<Target> callback) - throws QueryException, InterruptedException { - Collection<SkyKey> files = getSkyKeysForFileFragments(fileIdentifiers); - Uniquifier<SkyKey> keyUniquifier = new ThreadSafeSkyKeyUniquifier(/*concurrencyLevel=*/ 1); - Collection<SkyKey> current = keyUniquifier.unique(graph.getSuccessfulValues(files).keySet()); - Set<SkyKey> resultKeys = CompactHashSet.create(); - while (!current.isEmpty()) { - Collection<Iterable<SkyKey>> reverseDeps = graph.getReverseDeps(current).values(); - current = new HashSet<>(); - for (SkyKey rdep : Iterables.concat(reverseDeps)) { - if (rdep.functionName().equals(SkyFunctions.PACKAGE)) { - resultKeys.add(rdep); - // Every package has a dep on the external package, so we need to include those edges too. - if (rdep.equals(PackageValue.key(Label.EXTERNAL_PACKAGE_IDENTIFIER))) { + QueryTaskFuture<Void> getRBuildFiles( + Collection<PathFragment> fileIdentifiers, Callback<Target> callback) { + try { + Collection<SkyKey> files = getSkyKeysForFileFragments(fileIdentifiers); + Uniquifier<SkyKey> keyUniquifier = new SkyKeyUniquifier(/*concurrencyLevel=*/ 1); + Collection<SkyKey> current = keyUniquifier.unique(graph.getSuccessfulValues(files).keySet()); + Set<SkyKey> resultKeys = CompactHashSet.create(); + while (!current.isEmpty()) { + Collection<Iterable<SkyKey>> reverseDeps = graph.getReverseDeps(current).values(); + current = new HashSet<>(); + for (SkyKey rdep : Iterables.concat(reverseDeps)) { + if (rdep.functionName().equals(SkyFunctions.PACKAGE)) { + resultKeys.add(rdep); + // Every package has a dep on the external package, so we need to include those edges + // too. + if (rdep.equals(PackageValue.key(Label.EXTERNAL_PACKAGE_IDENTIFIER))) { + if (keyUniquifier.unique(rdep)) { + current.add(rdep); + } + } + } else if (!rdep.functionName().equals(SkyFunctions.PACKAGE_LOOKUP)) { + // Packages may depend on the existence of subpackages, but these edges aren't relevant + // to rbuildfiles. if (keyUniquifier.unique(rdep)) { current.add(rdep); } } - } else if (!rdep.functionName().equals(SkyFunctions.PACKAGE_LOOKUP)) { - // Packages may depend on the existence of subpackages, but these edges aren't relevant to - // rbuildfiles. - if (keyUniquifier.unique(rdep)) { - current.add(rdep); - } } - } - if (resultKeys.size() >= BATCH_CALLBACK_SIZE) { - for (Iterable<SkyKey> batch : Iterables.partition(resultKeys, BATCH_CALLBACK_SIZE)) { - callback.process( - getBuildFilesForPackageValues(graph.getSuccessfulValues(batch).values())); + if (resultKeys.size() >= BATCH_CALLBACK_SIZE) { + for (Iterable<SkyKey> batch : Iterables.partition(resultKeys, BATCH_CALLBACK_SIZE)) { + callback.process( + getBuildFilesForPackageValues(graph.getSuccessfulValues(batch).values())); + } + resultKeys.clear(); } - resultKeys.clear(); } + callback.process( + getBuildFilesForPackageValues(graph.getSuccessfulValues(resultKeys).values())); + return immediateSuccessfulFuture(null); + } catch (QueryException e) { + return immediateFailedFuture(e); + } catch (InterruptedException e) { + return immediateCancelledFuture(); } - callback.process(getBuildFilesForPackageValues(graph.getSuccessfulValues(resultKeys).values())); } @Override @@ -1093,9 +1166,8 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> } } - private static class ThreadSafeTargetUniquifier - extends AbstractThreadSafeUniquifier<Target, Label> { - protected ThreadSafeTargetUniquifier(int concurrencyLevel) { + private static class TargetUniquifier extends AbstractUniquifier<Target, Label> { + protected TargetUniquifier(int concurrencyLevel) { super(concurrencyLevel); } @@ -1105,9 +1177,8 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> } } - private static class ThreadSafeSkyKeyUniquifier - extends AbstractThreadSafeUniquifier<SkyKey, SkyKey> { - protected ThreadSafeSkyKeyUniquifier(int concurrencyLevel) { + private static class SkyKeyUniquifier extends AbstractUniquifier<SkyKey, SkyKey> { + protected SkyKeyUniquifier(int concurrencyLevel) { super(concurrencyLevel); } @@ -1121,9 +1192,9 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> * A uniquifer which takes a pair of parent and reverse dep, and uniquify based on the second * element (reverse dep). */ - private static class ThreadSafeReverseDepSkyKeyUniquifier - extends AbstractThreadSafeUniquifier<Pair<SkyKey, SkyKey>, SkyKey> { - protected ThreadSafeReverseDepSkyKeyUniquifier(int concurrencyLevel) { + private static class ReverseDepSkyKeyUniquifier + extends AbstractUniquifier<Pair<SkyKey, SkyKey>, SkyKey> { + protected ReverseDepSkyKeyUniquifier(int concurrencyLevel) { super(concurrencyLevel); } @@ -1146,18 +1217,25 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> * <p>This callback may be called from multiple threads concurrently. At most one thread will call * the wrapped {@code callback} concurrently. */ - @ThreadSafe - private static class BatchStreamedCallback extends OutputFormatterCallback<Target> - implements ThreadSafeCallback<Target> { - - private final OutputFormatterCallback<Target> callback; - private final ThreadSafeUniquifier<Target> uniquifier = - new ThreadSafeTargetUniquifier(DEFAULT_THREAD_COUNT); + // TODO(nharmata): For queries with less than {@code batchThreshold} results, this batching + // strategy probably hurts performance since we can only start formatting results once the entire + // query is finished. + private static class BatchStreamedCallback extends ThreadSafeOutputFormatterCallback<Target> + implements Callback<Target> { + + // TODO(nharmata): Now that we know the wrapped callback is ThreadSafe, there's no correctness + // concern that requires the prohibition of concurrent uses of the callback; the only concern is + // memory. We should have a threshold for when to invoke the callback with a batch, and also a + // separate, larger, bound on the number of targets being processed at the same time. + private final ThreadSafeOutputFormatterCallback<Target> callback; + private final Uniquifier<Target> uniquifier = new TargetUniquifier(DEFAULT_THREAD_COUNT); private final Object pendingLock = new Object(); private List<Target> pending = new ArrayList<>(); private int batchThreshold; - private BatchStreamedCallback(OutputFormatterCallback<Target> callback, int batchThreshold) { + private BatchStreamedCallback( + ThreadSafeOutputFormatterCallback<Target> callback, + int batchThreshold) { this.callback = callback; this.batchThreshold = batchThreshold; } @@ -1201,26 +1279,23 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> @ThreadSafe @Override - public void getAllRdepsUnboundedParallel( + public QueryTaskFuture<Void> getAllRdepsUnboundedParallel( QueryExpression expression, VariableContext<Target> context, - ThreadSafeCallback<Target> callback, - ForkJoinPool forkJoinPool) - throws QueryException, InterruptedException { - ParallelSkyQueryUtils.getAllRdepsUnboundedParallel( + Callback<Target> callback) { + return ParallelSkyQueryUtils.getAllRdepsUnboundedParallel( this, expression, context, callback, packageSemaphore); } @ThreadSafe @Override - public void getAllRdeps( + public QueryTaskFuture<Void> getAllRdeps( QueryExpression expression, Predicate<Target> universe, VariableContext<Target> context, Callback<Target> callback, - int depth) - throws QueryException, InterruptedException { - getAllRdeps(expression, universe, context, callback, depth, BATCH_CALLBACK_SIZE); + int depth) { + return getAllRdeps(expression, universe, context, callback, depth, BATCH_CALLBACK_SIZE); } /** @@ -1230,16 +1305,15 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> * nodes are directly depended on by a large number of other nodes. */ @VisibleForTesting - protected void getAllRdeps( + protected QueryTaskFuture<Void> getAllRdeps( QueryExpression expression, Predicate<Target> universe, VariableContext<Target> context, Callback<Target> callback, int depth, - int batchSize) - throws QueryException, InterruptedException { + int batchSize) { Uniquifier<Target> uniquifier = createUniquifier(); - eval( + return eval( expression, context, new BatchAllRdepsCallback(uniquifier, universe, callback, depth, batchSize)); diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AbstractQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AbstractQueryEnvironment.java new file mode 100644 index 0000000000..62fd91b56f --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AbstractQueryEnvironment.java @@ -0,0 +1,194 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.query2.engine; + +import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; +import com.google.devtools.build.lib.util.Preconditions; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A partial implementation of {@link QueryEnvironment} that has trivial in-thread implementations + * of all the {@link QueryTaskFuture}/{@link QueryTaskCallable} helper methods. + */ +public abstract class AbstractQueryEnvironment<T> implements QueryEnvironment<T> { + /** Concrete implementation of {@link QueryTaskFuture}. */ + protected static final class QueryTaskFutureImpl<T> + extends QueryTaskFutureImplBase<T> implements ListenableFuture<T> { + private final ListenableFuture<T> delegate; + + private QueryTaskFutureImpl(ListenableFuture<T> delegate) { + this.delegate = delegate; + } + + public static <R> QueryTaskFutureImpl<R> ofDelegate(ListenableFuture<R> delegate) { + return (delegate instanceof QueryTaskFutureImpl) + ? (QueryTaskFutureImpl<R>) delegate + : new QueryTaskFutureImpl<>(delegate); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return delegate.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return delegate.isCancelled(); + } + + @Override + public boolean isDone() { + return delegate.isDone(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return delegate.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return delegate.get(timeout, unit); + } + + @Override + public void addListener(Runnable listener, Executor executor) { + delegate.addListener(listener, executor); + } + + @Override + public T getIfSuccessful() { + Preconditions.checkState(delegate.isDone()); + try { + return delegate.get(); + } catch (CancellationException | InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + } + + public T getChecked() throws InterruptedException, QueryException { + try { + return get(); + } catch (CancellationException e) { + throw new InterruptedException(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + Throwables.propagateIfPossible(cause, QueryException.class); + Throwables.propagateIfPossible(cause, InterruptedException.class); + throw new IllegalStateException(e.getCause()); + } + } + } + + @Override + public <R> QueryTaskFuture<R> immediateSuccessfulFuture(R value) { + return new QueryTaskFutureImpl<>(Futures.immediateFuture(value)); + } + + @Override + public <R> QueryTaskFuture<R> immediateFailedFuture(QueryException e) { + return new QueryTaskFutureImpl<>(Futures.<R>immediateFailedFuture(e)); + } + + @Override + public <R> QueryTaskFuture<R> immediateCancelledFuture() { + return new QueryTaskFutureImpl<>(Futures.<R>immediateCancelledFuture()); + } + + @Override + public QueryTaskFuture<Void> eval( + QueryExpression expr, VariableContext<T> context, Callback<T> callback) { + return expr.eval(this, context, callback); + } + + @Override + public <R> QueryTaskFuture<R> executeAsync(QueryTaskCallable<R> callable) { + try { + return immediateSuccessfulFuture(callable.call()); + } catch (QueryException e) { + return immediateFailedFuture(e); + } catch (InterruptedException e) { + return immediateCancelledFuture(); + } + } + + @Override + public <R> QueryTaskFuture<R> whenSucceedsCall( + QueryTaskFuture<?> future, QueryTaskCallable<R> callable) { + return whenAllSucceedCall(ImmutableList.of(future), callable); + } + + private static class Dummy implements QueryTaskCallable<Void> { + public static final Dummy INSTANCE = new Dummy(); + + private Dummy() {} + + @Override + public Void call() { + return null; + } + } + + @Override + public QueryTaskFuture<Void> whenAllSucceed(Iterable<? extends QueryTaskFuture<?>> futures) { + return whenAllSucceedCall(futures, Dummy.INSTANCE); + } + + @Override + public <R> QueryTaskFuture<R> whenAllSucceedCall( + Iterable<? extends QueryTaskFuture<?>> futures, QueryTaskCallable<R> callable) { + return QueryTaskFutureImpl.ofDelegate( + Futures.whenAllSucceed(cast(futures)).call(callable)); + } + + @Override + public <T1, T2> QueryTaskFuture<T2> transformAsync( + QueryTaskFuture<T1> future, + final Function<T1, QueryTaskFuture<T2>> function) { + return QueryTaskFutureImpl.ofDelegate( + Futures.transformAsync( + (QueryTaskFutureImpl<T1>) future, + new AsyncFunction<T1, T2>() { + @Override + public ListenableFuture<T2> apply(T1 input) throws Exception { + return (QueryTaskFutureImpl<T2>) function.apply(input); + } + })); + } + + protected static Iterable<QueryTaskFutureImpl<?>> cast( + Iterable<? extends QueryTaskFuture<?>> futures) { + return Iterables.transform( + futures, + new Function<QueryTaskFuture<?>, QueryTaskFutureImpl<?>>() { + @Override + public QueryTaskFutureImpl<?> apply(QueryTaskFuture<?> future) { + return (QueryTaskFutureImpl<?>) future; + } + }); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java index adc12d278f..81be4c8ca2 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java @@ -21,12 +21,12 @@ import com.google.common.collect.Sets; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; - +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.ForkJoinPool; /** * Implementation of the <code>allpaths()</code> function. @@ -51,46 +51,47 @@ public class AllPathsFunction implements QueryFunction { } @Override - public <T> void eval( - QueryEnvironment<T> env, + public <T> QueryTaskFuture<Void> eval( + final QueryEnvironment<T> env, VariableContext<T> context, - QueryExpression expression, + final QueryExpression expression, List<Argument> args, - Callback<T> callback) throws QueryException, InterruptedException { - - Set<T> fromValue = QueryUtil.evalAll(env, context, args.get(0).getExpression()); - Set<T> toValue = QueryUtil.evalAll(env, context, args.get(1).getExpression()); + final Callback<T> callback) { + final QueryTaskFuture<Set<T>> fromValueFuture = + QueryUtil.evalAll(env, context, args.get(0).getExpression()); + final QueryTaskFuture<Set<T>> toValueFuture = + QueryUtil.evalAll(env, context, args.get(1).getExpression()); - // Algorithm: compute "reachableFromX", the forward transitive closure of - // the "from" set, then find the intersection of "reachableFromX" with the - // reverse transitive closure of the "to" set. The reverse transitive - // closure and intersection operations are interleaved for efficiency. - // "result" holds the intersection. + return env.whenAllSucceedCall( + ImmutableList.of(fromValueFuture, toValueFuture), + new QueryTaskCallable<Void>() { + @Override + public Void call() throws QueryException, InterruptedException { + // Algorithm: compute "reachableFromX", the forward transitive closure of + // the "from" set, then find the intersection of "reachableFromX" with the + // reverse transitive closure of the "to" set. The reverse transitive + // closure and intersection operations are interleaved for efficiency. + // "result" holds the intersection. - env.buildTransitiveClosure(expression, fromValue, Integer.MAX_VALUE); + Set<T> fromValue = fromValueFuture.getIfSuccessful(); + Set<T> toValue = toValueFuture.getIfSuccessful(); - Set<T> reachableFromX = env.getTransitiveClosure(fromValue); - Predicate<T> reachable = Predicates.in(reachableFromX); - Uniquifier<T> uniquifier = env.createUniquifier(); - Collection<T> result = uniquifier.unique(intersection(reachableFromX, toValue)); - callback.process(result); - Collection<T> worklist = result; - while (!worklist.isEmpty()) { - Collection<T> reverseDeps = env.getReverseDeps(worklist); - worklist = uniquifier.unique(Iterables.filter(reverseDeps, reachable)); - callback.process(worklist); - } - } + env.buildTransitiveClosure(expression, fromValue, Integer.MAX_VALUE); - @Override - public <T> void parEval( - QueryEnvironment<T> env, - VariableContext<T> context, - QueryExpression expression, - List<Argument> args, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - eval(env, context, expression, args, callback); + Set<T> reachableFromX = env.getTransitiveClosure(fromValue); + Predicate<T> reachable = Predicates.in(reachableFromX); + Uniquifier<T> uniquifier = env.createUniquifier(); + Collection<T> result = uniquifier.unique(intersection(reachableFromX, toValue)); + callback.process(result); + Collection<T> worklist = result; + while (!worklist.isEmpty()) { + Collection<T> reverseDeps = env.getReverseDeps(worklist); + worklist = uniquifier.unique(Iterables.filter(reverseDeps, reachable)); + callback.process(worklist); + } + return null; + } + }); } /** diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java index 518b67497b..f800d804dc 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java @@ -13,6 +13,7 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; +import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; @@ -20,10 +21,9 @@ import com.google.common.collect.Iterables; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; - +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ForkJoinPool; /** * An "allrdeps" query expression, which computes the reverse dependencies of the argument within @@ -52,30 +52,34 @@ public class AllRdepsFunction implements QueryFunction { } @Override - public <T> void eval( + public <T> QueryTaskFuture<Void> eval( QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expression, List<Argument> args, - Callback<T> callback) throws QueryException, InterruptedException { - eval(env, context, args, callback, Predicates.<T>alwaysTrue()); + Callback<T> callback) { + return eval(env, context, args, callback, Optional.<Predicate<T>>absent()); } - protected <T> void eval( + protected <T> QueryTaskFuture<Void> eval( final QueryEnvironment<T> env, VariableContext<T> context, final List<Argument> args, final Callback<T> callback, - final Predicate<T> universe) - throws QueryException, InterruptedException { - + Optional<Predicate<T>> universeMaybe) { final int depth = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE; + final Predicate<T> universe = universeMaybe.isPresent() + ? universeMaybe.get() + : Predicates.<T>alwaysTrue(); if (env instanceof StreamableQueryEnvironment<?>) { - ((StreamableQueryEnvironment<T>) env) - .getAllRdeps(args.get(0).getExpression(), universe, context, callback, depth); + StreamableQueryEnvironment<T> streamableEnv = ((StreamableQueryEnvironment<T>) env); + return depth == Integer.MAX_VALUE && !universeMaybe.isPresent() + ? streamableEnv.getAllRdepsUnboundedParallel(args.get(0).getExpression(), context, callback) + : streamableEnv.getAllRdeps( + args.get(0).getExpression(), universe, context, callback, depth); } else { final Uniquifier<T> uniquifier = env.createUniquifier(); - env.eval( + return env.eval( args.get(0).getExpression(), context, new Callback<T>() { @@ -103,21 +107,4 @@ public class AllRdepsFunction implements QueryFunction { }); } } - - @Override - public <T> void parEval( - QueryEnvironment<T> env, - VariableContext<T> context, - QueryExpression expression, - List<Argument> args, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - boolean unbounded = args.size() == 1; - if (unbounded && env instanceof StreamableQueryEnvironment<?>) { - ((StreamableQueryEnvironment<T>) env).getAllRdepsUnboundedParallel( - args.get(0).getExpression(), context, callback, forkJoinPool); - } else { - eval(env, context, expression, args, callback); - } - } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java index 89374d0a94..f9d20dbb19 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java @@ -13,16 +13,16 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; +import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; -import com.google.devtools.build.lib.query2.engine.Lexer.TokenKind; -import com.google.devtools.build.lib.query2.engine.ParallelQueryUtils.QueryTask; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import com.google.devtools.build.lib.util.Preconditions; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.ForkJoinPool; /** * A binary algebraic set operation. @@ -56,40 +56,84 @@ public class BinaryOperatorExpression extends QueryExpression { } @Override - protected <T> void evalImpl( - QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) - throws QueryException, InterruptedException { + public <T> QueryTaskFuture<Void> eval( + QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) { + switch (operator) { + case PLUS: + case UNION: + return evalPlus(operands, env, context, callback); + case MINUS: + case EXCEPT: + return evalMinus(operands, env, context, callback); + case INTERSECT: + case CARET: + return evalIntersect(env, context, callback); + default: + throw new IllegalStateException(operator.toString()); + } + } - if (operator == TokenKind.PLUS || operator == TokenKind.UNION) { - for (QueryExpression operand : operands) { - env.eval(operand, context, callback); - } - return; + /** + * Evaluates an expression of the form "e1 + e2 + ... + eK" by evaluating all the subexpressions + * separately. + * + * <p>N.B. {@code operands.size()} may be {@code 1}. + */ + private static <T> QueryTaskFuture<Void> evalPlus( + ImmutableList<QueryExpression> operands, + QueryEnvironment<T> env, + VariableContext<T> context, + Callback<T> callback) { + ArrayList<QueryTaskFuture<Void>> queryTasks = new ArrayList<>(operands.size()); + for (QueryExpression operand : operands) { + queryTasks.add(env.eval(operand, context, callback)); } + return env.whenAllSucceed(queryTasks); + } - // Once we have fully evaluated the left-hand side, we can stream-process the right-hand side - // for minus operations. Note that this is suboptimal if the left-hand side results are very - // large compared to the right-hand side. Which is the case is hard to know before evaluating. - // We could consider determining this dynamically, however, by evaluating both the left and - // right hand side partially until one side finishes sooner. - final Set<T> lhsValue = QueryUtil.evalAll(env, context, operands.get(0)); - if (operator == TokenKind.EXCEPT || operator == TokenKind.MINUS) { - for (int i = 1; i < operands.size(); i++) { - env.eval(operands.get(i), context, - new Callback<T>() { + /** + * Evaluates an expression of the form "e1 - e2 - ... - eK" by noting its equivalence to + * "e1 - (e2 + ... + eK)" and evaluating the subexpressions on the right-hand-side separately. + */ + private static <T> QueryTaskFuture<Void> evalMinus( + final ImmutableList<QueryExpression> operands, + final QueryEnvironment<T> env, + final VariableContext<T> context, + final Callback<T> callback) { + QueryTaskFuture<Set<T>> lhsValueFuture = QueryUtil.evalAll(env, context, operands.get(0)); + Function<Set<T>, QueryTaskFuture<Void>> substractAsyncFunction = + new Function<Set<T>, QueryTaskFuture<Void>>() { + @Override + public QueryTaskFuture<Void> apply(Set<T> lhsValue) { + final Set<T> threadSafeLhsValue = Sets.newConcurrentHashSet(lhsValue); + Callback<T> subtractionCallback = new Callback<T>() { + @Override + public void process(Iterable<T> partialResult) { + for (T target : partialResult) { + threadSafeLhsValue.remove(target); + } + } + }; + QueryTaskFuture<Void> rhsEvaluatedFuture = evalPlus( + operands.subList(1, operands.size()), env, context, subtractionCallback); + return env.whenSucceedsCall( + rhsEvaluatedFuture, + new QueryTaskCallable<Void>() { @Override - public void process(Iterable<T> partialResult) - throws QueryException, InterruptedException { - for (T target : partialResult) { - lhsValue.remove(target); - } + public Void call() throws QueryException, InterruptedException { + callback.process(threadSafeLhsValue); + return null; } }); } - callback.process(lhsValue); - return; - } + }; + return env.transformAsync(lhsValueFuture, substractAsyncFunction); + } + private <T> QueryTaskFuture<Void> evalIntersect( + final QueryEnvironment<T> env, + final VariableContext<T> context, + final Callback<T> callback) { // For each right-hand side operand, intersection cannot be performed in a streaming manner; the // entire result of that operand is needed. So, in order to avoid pinning too much in memory at // once, we process each right-hand side operand one at a time and throw away that operand's @@ -97,77 +141,39 @@ public class BinaryOperatorExpression extends QueryExpression { // TODO(bazel-team): Consider keeping just the name / label of the right-hand side results // instead of the potentially heavy-weight instances of type T. This would let us process all // right-hand side operands in parallel without worrying about memory usage. - Preconditions.checkState(operator == TokenKind.INTERSECT || operator == TokenKind.CARET, - operator); + QueryTaskFuture<Set<T>> rollingResultFuture = QueryUtil.evalAll(env, context, operands.get(0)); for (int i = 1; i < operands.size(); i++) { - lhsValue.retainAll(QueryUtil.evalAll(env, context, operands.get(i))); + final int index = i; + Function<Set<T>, QueryTaskFuture<Set<T>>> evalOperandAndIntersectAsyncFunction = + new Function<Set<T>, QueryTaskFuture<Set<T>>>() { + @Override + public QueryTaskFuture<Set<T>> apply(final Set<T> rollingResult) { + final QueryTaskFuture<Set<T>> rhsOperandValueFuture = + QueryUtil.evalAll(env, context, operands.get(index)); + return env.whenSucceedsCall( + rhsOperandValueFuture, + new QueryTaskCallable<Set<T>>() { + @Override + public Set<T> call() throws QueryException, InterruptedException { + rollingResult.retainAll(rhsOperandValueFuture.getIfSuccessful()); + return rollingResult; + } + }); + } + }; + rollingResultFuture = + env.transformAsync(rollingResultFuture, evalOperandAndIntersectAsyncFunction); } - callback.process(lhsValue); - } - - @Override - protected <T> void parEvalImpl( - QueryEnvironment<T> env, - VariableContext<T> context, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) - throws QueryException, InterruptedException { - if (operator == TokenKind.PLUS || operator == TokenKind.UNION) { - parEvalPlus(operands, env, context, callback, forkJoinPool); - } else if (operator == TokenKind.EXCEPT || operator == TokenKind.MINUS) { - parEvalMinus(operands, env, context, callback, forkJoinPool); - } else { - evalImpl(env, context, callback); - } - } - - /** - * Evaluates an expression of the form "e1 + e2 + ... + eK" by evaluating all the subexpressions - * in parallel. - */ - private static <T> void parEvalPlus( - ImmutableList<QueryExpression> operands, - final QueryEnvironment<T> env, - final VariableContext<T> context, - final ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) - throws QueryException, InterruptedException { - ArrayList<QueryTask> queryTasks = new ArrayList<>(operands.size()); - for (final QueryExpression operand : operands) { - queryTasks.add(new QueryTask() { - @Override - public void execute() throws QueryException, InterruptedException { - env.eval(operand, context, callback); - } - }); - } - ParallelQueryUtils.executeQueryTasksAndWaitInterruptiblyFailFast(queryTasks, forkJoinPool); - } - - /** - * Evaluates an expression of the form "e1 - e2 - ... - eK" by noting its equivalence to - * "e1 - (e2 + ... + eK)" and evaluating the subexpressions on the right-hand-side in parallel. - */ - private static <T> void parEvalMinus( - ImmutableList<QueryExpression> operands, - QueryEnvironment<T> env, - VariableContext<T> context, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) - throws QueryException, InterruptedException { - final Set<T> lhsValue = - Sets.newConcurrentHashSet(QueryUtil.evalAll(env, context, operands.get(0))); - ThreadSafeCallback<T> subtractionCallback = new ThreadSafeCallback<T>() { - @Override - public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { - for (T target : partialResult) { - lhsValue.remove(target); - } - } - }; - parEvalPlus( - operands.subList(1, operands.size()), env, context, subtractionCallback, forkJoinPool); - callback.process(lhsValue); + final QueryTaskFuture<Set<T>> resultFuture = rollingResultFuture; + return env.whenSucceedsCall( + resultFuture, + new QueryTaskCallable<Void>() { + @Override + public Void call() throws QueryException, InterruptedException { + callback.process(resultFuture.getIfSuccessful()); + return null; + } + }); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java index d2a2eb0fa8..cbc0ae8d38 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java @@ -19,10 +19,9 @@ import com.google.devtools.build.lib.collect.CompactHashSet; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; - +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import java.util.List; import java.util.Set; -import java.util.concurrent.ForkJoinPool; /** * A buildfiles(x) query expression, which computes the set of BUILD files and @@ -42,18 +41,17 @@ class BuildFilesFunction implements QueryFunction { } @Override - public <T> void eval( + public <T> QueryTaskFuture<Void> eval( final QueryEnvironment<T> env, VariableContext<T> context, final QueryExpression expression, List<Argument> args, - final Callback<T> callback) - throws QueryException, InterruptedException { + final Callback<T> callback) { final Uniquifier<T> uniquifier = env.createUniquifier(); - env.eval( + return env.eval( args.get(0).getExpression(), context, - new ThreadSafeCallback<T>() { + new Callback<T>() { @Override public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { @@ -67,18 +65,6 @@ class BuildFilesFunction implements QueryFunction { } @Override - public <T> void parEval( - QueryEnvironment<T> env, - VariableContext<T> context, - QueryExpression expression, - List<Argument> args, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - // 'eval' is written in such a way that it enables parallel evaluation of 'expression'. - eval(env, context, expression, args, callback); - } - - @Override public int getMandatoryArguments() { return 1; } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java index 0f4321145d..51c51fa99b 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java @@ -13,14 +13,17 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.util.BatchCallback; +import com.google.devtools.build.lib.util.ThreadSafeBatchCallback; /** * Query callback to be called by a {@link QueryExpression} when it has part of the computation * result. Assuming the {@code QueryEnvironment} supports it, it would allow the caller * to stream the results. */ -public interface Callback<T> extends BatchCallback<T, QueryException> { +@ThreadSafe +public interface Callback<T> extends ThreadSafeBatchCallback<T, QueryException> { /** * According to the {@link BatchCallback} interface, repeated elements may be passed in here. diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java index 5eca701702..7317a35028 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java @@ -18,10 +18,10 @@ import com.google.common.collect.Sets; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.ForkJoinPool; /** * A "deps" query expression, which computes the dependencies of the argument. An optional @@ -53,15 +53,15 @@ final class DepsFunction implements QueryFunction { * Breadth-first search from the arguments. */ @Override - public <T> void eval( + public <T> QueryTaskFuture<Void> eval( final QueryEnvironment<T> env, VariableContext<T> context, final QueryExpression expression, List<Argument> args, - final Callback<T> callback) throws QueryException, InterruptedException { + final Callback<T> callback) { final int depthBound = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE; final Uniquifier<T> uniquifier = env.createUniquifier(); - env.eval(args.get(0).getExpression(), context, new Callback<T>() { + return env.eval(args.get(0).getExpression(), context, new Callback<T>() { @Override public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { Collection<T> current = Sets.newHashSet(partialResult); @@ -83,15 +83,4 @@ final class DepsFunction implements QueryFunction { } }); } - - @Override - public <T> void parEval( - QueryEnvironment<T> env, - VariableContext<T> context, - QueryExpression expression, - List<Argument> args, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - eval(env, context, expression, args, callback); - } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java index a31196aba5..85cfe9fddb 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java @@ -20,10 +20,9 @@ import com.google.common.collect.Iterables; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; - +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import java.util.Collection; import java.util.List; -import java.util.concurrent.ForkJoinPool; /** * A query expression for user-defined query functions. @@ -46,19 +45,9 @@ public class FunctionExpression extends QueryExpression { } @Override - protected <T> void evalImpl( - QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) - throws QueryException, InterruptedException { - function.eval(env, context, this, args, callback); - } - - @Override - protected <T> void parEvalImpl( - QueryEnvironment<T> env, - VariableContext<T> context, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - function.parEval(env, context, this, args, callback, forkJoinPool); + public <T> QueryTaskFuture<Void> eval( + QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) { + return function.eval(env, context, this, args, callback); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java index 4fa428adb5..1d68573578 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java @@ -17,9 +17,9 @@ import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ForkJoinPool; /** * A label(attr_name, argument) expression, which computes the set of targets @@ -52,16 +52,15 @@ class LabelsFunction implements QueryFunction { } @Override - public <T> void eval( + public <T> QueryTaskFuture<Void> eval( final QueryEnvironment<T> env, VariableContext<T> context, final QueryExpression expression, final List<Argument> args, - final Callback<T> callback) - throws QueryException, InterruptedException { + final Callback<T> callback) { final String attrName = args.get(0).getWord(); final Uniquifier<T> uniquifier = env.createUniquifier(); - env.eval(args.get(1).getExpression(), context, new Callback<T>() { + return env.eval(args.get(1).getExpression(), context, new Callback<T>() { @Override public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { for (T input : partialResult) { @@ -80,15 +79,4 @@ class LabelsFunction implements QueryFunction { } }); } - - @Override - public <T> void parEval( - QueryEnvironment<T> env, - VariableContext<T> context, - QueryExpression expression, - List<Argument> args, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - eval(env, context, expression, args, callback); - } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java index 64d94da19a..a7c3abeb62 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java @@ -13,6 +13,8 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; +import com.google.common.base.Function; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import java.util.Collection; import java.util.Set; import java.util.regex.Pattern; @@ -64,15 +66,24 @@ class LetExpression extends QueryExpression { } @Override - protected <T> void evalImpl( - QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) - throws QueryException, InterruptedException { + public <T> QueryTaskFuture<Void> eval( + final QueryEnvironment<T> env, + final VariableContext<T> context, + final Callback<T> callback) { if (!NAME_PATTERN.matcher(varName).matches()) { - throw new QueryException(this, "invalid variable name '" + varName + "' in let expression"); + return env.immediateFailedFuture( + new QueryException(this, "invalid variable name '" + varName + "' in let expression")); } - Set<T> varValue = QueryUtil.evalAll(env, context, varExpr); - VariableContext<T> bodyContext = VariableContext.with(context, varName, varValue); - env.eval(bodyExpr, bodyContext, callback); + QueryTaskFuture<Set<T>> varValueFuture = QueryUtil.evalAll(env, context, varExpr); + Function<Set<T>, QueryTaskFuture<Void>> evalBodyAsyncFunction = + new Function<Set<T>, QueryTaskFuture<Void>>() { + @Override + public QueryTaskFuture<Void> apply(Set<T> varValue) { + VariableContext<T> bodyContext = VariableContext.with(context, varName, varValue); + return env.eval(bodyExpr, bodyContext, callback); + } + }; + return env.transformAsync(varValueFuture, evalBodyAsyncFunction); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java index 311a6afff5..80b912f6d7 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java @@ -16,10 +16,9 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.devtools.build.lib.collect.CompactHashSet; -import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import java.util.List; import java.util.Set; -import java.util.concurrent.ForkJoinPool; /** * A loadfiles(x) query expression, which computes the set of .bzl files @@ -38,15 +37,14 @@ class LoadFilesFunction implements QueryEnvironment.QueryFunction { } @Override - public <T> void eval( + public <T> QueryTaskFuture<Void> eval( final QueryEnvironment<T> env, VariableContext<T> context, final QueryExpression expression, List<QueryEnvironment.Argument> args, - final Callback<T> callback) - throws QueryException, InterruptedException { + final Callback<T> callback) { final Uniquifier<T> uniquifier = env.createUniquifier(); - env.eval( + return env.eval( args.get(0).getExpression(), context, new Callback<T>() { @@ -67,17 +65,6 @@ class LoadFilesFunction implements QueryEnvironment.QueryFunction { } @Override - public <T> void parEval( - QueryEnvironment<T> env, - VariableContext<T> context, - QueryExpression expression, - List<Argument> args, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - eval(env, context, expression, args, callback); - } - - @Override public int getMandatoryArguments() { return 1; } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java index 5d21c874be..50708d6816 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java @@ -46,7 +46,7 @@ public abstract class OutputFormatterCallback<T> implements Callback<T> { * disambiguate between real interruptions or IO Exceptions. */ @Override - public final void process(Iterable<T> partialResult) throws QueryException, InterruptedException { + public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { try { processOutput(partialResult); } catch (IOException e) { diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java deleted file mode 100644 index 6e22709deb..0000000000 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java +++ /dev/null @@ -1,188 +0,0 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.query2.engine; - -import com.google.common.collect.Iterables; -import com.google.devtools.build.lib.concurrent.MoreFutures; -import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ForkJoinTask; -import java.util.concurrent.Future; - -/** Several utilities to aid in writing {@link QueryExpression#parEvalImpl} implementations. */ -public class ParallelQueryUtils { - /** - * Encapsulation of a subtask of parallel evaluation of a {@link QueryExpression}. See - * {@link #executeQueryTasksAndWaitInterruptiblyFailFast}. - */ - @ThreadSafe - public interface QueryTask { - void execute() throws QueryException, InterruptedException; - } - - /** - * Executes the given {@link QueryTask}s using the given {@link ForkJoinPool} and interruptibly - * waits for their completion. Throws the first {@link QueryException} encountered during parallel - * execution or an {@link InterruptedException} if the calling thread is interrupted. - * - * <p>These "fail-fast" semantics are desirable to avoid doing unneeded work when evaluating - * multiple {@link QueryTask}s in parallel: if serial execution of the tasks would result in a - * {@link QueryException} then we want parallel execution to do so as well, but there's no need to - * continue waiting for completion of the tasks after at least one of them results in a - * {@link QueryException}. - */ - public static void executeQueryTasksAndWaitInterruptiblyFailFast( - List<QueryTask> queryTasks, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - int numTasks = queryTasks.size(); - if (numTasks == 1) { - Iterables.getOnlyElement(queryTasks).execute(); - return; - } - FailFastCountDownLatch failFastLatch = new FailFastCountDownLatch(numTasks); - ArrayList<QueryTaskForkJoinTask> forkJoinTasks = new ArrayList<>(numTasks); - for (QueryTask queryTask : queryTasks) { - QueryTaskForkJoinTask forkJoinTask = adaptAsForkJoinTask(queryTask, failFastLatch); - forkJoinTasks.add(forkJoinTask); - @SuppressWarnings("unused") - Future<?> possiblyIgnoredError = forkJoinPool.submit(forkJoinTask); - } - failFastLatch.await(); - try { - MoreFutures.waitForAllInterruptiblyFailFast(forkJoinTasks); - } catch (ExecutionException e) { - throw rethrowCause(e); - } - } - - private static QueryTaskForkJoinTask adaptAsForkJoinTask( - QueryTask queryTask, - FailFastCountDownLatch failFastLatch) { - return new QueryTaskForkJoinTask(queryTask, failFastLatch); - } - - private static RuntimeException rethrowCause(ExecutionException e) - throws QueryException, InterruptedException { - Throwable cause = e.getCause(); - if (cause instanceof ParallelRuntimeException) { - ((ParallelRuntimeException) cause).rethrow(); - } - throw new IllegalStateException(e); - } - - /** - * Wrapper around a {@link CountDownLatch} with initial count {@code n} that counts down once on - * "success" and {@code n} times on "failure". - * - * <p>This can be used in a concurrent context to wait until either {@code n} tasks are successful - * or at least one of them fails. - */ - @ThreadSafe - private static class FailFastCountDownLatch { - private final int n; - private final CountDownLatch completionLatch; - - private FailFastCountDownLatch(int n) { - this.n = n; - this.completionLatch = new CountDownLatch(n); - } - - private void await() throws InterruptedException { - completionLatch.await(); - } - - private void countDown(boolean success) { - if (success) { - completionLatch.countDown(); - } else { - for (int i = 0; i < n; i++) { - completionLatch.countDown(); - } - } - } - } - - // ForkJoinTask#adapt(Callable) wraps thrown checked exceptions as RuntimeExceptions. We avoid - // having to think about that messiness (which is inconsistent with other Future implementations) - // by having our own ForkJoinTask subclass and managing checked exceptions ourselves. - @ThreadSafe - private static class QueryTaskForkJoinTask extends ForkJoinTask<Void> { - private final QueryTask queryTask; - private final FailFastCountDownLatch completionLatch; - - private QueryTaskForkJoinTask(QueryTask queryTask, FailFastCountDownLatch completionLatch) { - this.queryTask = queryTask; - this.completionLatch = completionLatch; - } - - @Override - public Void getRawResult() { - return null; - } - - @Override - protected void setRawResult(Void value) { - } - - @Override - protected boolean exec() { - boolean successful = false; - try { - queryTask.execute(); - successful = true; - return true; - } catch (QueryException queryException) { - throw new ParallelRuntimeQueryException(queryException); - } catch (InterruptedException interruptedException) { - throw new ParallelInterruptedQueryException(interruptedException); - } finally { - completionLatch.countDown(successful); - } - } - } - - private abstract static class ParallelRuntimeException extends RuntimeException { - abstract void rethrow() throws QueryException, InterruptedException; - } - - private static class ParallelRuntimeQueryException extends ParallelRuntimeException { - private final QueryException queryException; - - private ParallelRuntimeQueryException(QueryException queryException) { - this.queryException = queryException; - } - - @Override - void rethrow() throws QueryException, InterruptedException { - throw queryException; - } - } - - private static class ParallelInterruptedQueryException extends ParallelRuntimeException { - private final InterruptedException interruptedException; - - private ParallelInterruptedQueryException(InterruptedException interruptedException) { - this.interruptedException = interruptedException; - } - - @Override - void rethrow() throws QueryException, InterruptedException { - throw interruptedException; - } - } -} diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java index 879c9c1ddb..0c11eec8f2 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java @@ -13,11 +13,13 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; +import com.google.common.base.Function; import com.google.common.collect.ImmutableList; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Callable; import javax.annotation.Nonnull; /** @@ -91,16 +93,14 @@ public interface QueryEnvironment<T> { /** A user-defined query function. */ interface QueryFunction { - /** - * Name of the function as it appears in the query language. - */ + /** Name of the function as it appears in the query language. */ String getName(); /** * The number of arguments that are required. The rest is optional. * - * <p>This should be greater than or equal to zero and at smaller than or equal to the length - * of the list returned by {@link #getArgumentTypes}. + * <p>This should be greater than or equal to zero and at smaller than or equal to the length of + * the list returned by {@link #getArgumentTypes}. */ int getMandatoryArguments(); @@ -108,34 +108,21 @@ public interface QueryEnvironment<T> { Iterable<ArgumentType> getArgumentTypes(); /** - * Called when a user-defined function is to be evaluated. + * Returns a {@link QueryTaskFuture} representing the asynchronous application of this + * {@link QueryFunction} to the given {@code args}, feeding the results to the given + * {@code callback}. * * @param env the query environment this function is evaluated in. * @param expression the expression being evaluated. - * @param args the input arguments. These are type-checked against the specification returned - * by {@link #getArgumentTypes} and {@link #getMandatoryArguments} - */ - <T> void eval( - QueryEnvironment<T> env, - VariableContext<T> context, - QueryExpression expression, - List<Argument> args, - Callback<T> callback) throws QueryException, InterruptedException; - - /** - * Same as {@link #eval(QueryEnvironment, VariableContext, QueryExpression, List, Callback)}, - * except that this {@link QueryFunction} may use {@code forkJoinPool} to achieve - * parallelism. - * - * <p>The caller must ensure that {@code env} is thread safe. + * @param args the input arguments. These are type-checked against the specification returned by + * {@link #getArgumentTypes} and {@link #getMandatoryArguments} */ - <T> void parEval( + <T> QueryTaskFuture<Void> eval( QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expression, List<Argument> args, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException; + Callback<T> callback); } /** @@ -156,18 +143,8 @@ public interface QueryEnvironment<T> { * Invokes {@code callback} with the set of target nodes in the graph for the specified target * pattern, in 'blaze build' syntax. */ - void getTargetsMatchingPattern(QueryExpression owner, String pattern, Callback<T> callback) - throws QueryException, InterruptedException; - - /** - * Same as {@link #getTargetsMatchingPattern}, but optionally making use of the given - * {@link ForkJoinPool} to achieve parallelism. - */ - void getTargetsMatchingPatternPar( - QueryExpression owner, - String pattern, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException; + QueryTaskFuture<Void> getTargetsMatchingPattern( + QueryExpression owner, String pattern, Callback<T> callback); /** Ensures the specified target exists. */ // NOTE(bazel-team): this method is left here as scaffolding from a previous refactoring. It may @@ -203,14 +180,159 @@ public interface QueryEnvironment<T> { Set<T> getNodesOnPath(T from, T to) throws InterruptedException; /** - * Eval an expression {@code expr} and pass the results to the {@code callback}. + * Returns a {@link QueryTaskFuture} representing the asynchronous evaluation of the given + * {@code expr} and passing of the results to the given {@code callback}. * * <p>Note that this method should guarantee that the callback does not see repeated elements. + * * @param expr The expression to evaluate * @param callback The caller callback to notify when results are available */ - void eval(QueryExpression expr, VariableContext<T> context, Callback<T> callback) - throws QueryException, InterruptedException; + QueryTaskFuture<Void> eval( + QueryExpression expr, VariableContext<T> context, Callback<T> callback); + + /** + * An asynchronous computation of part of a query evaluation. + * + * <p>A {@link QueryTaskFuture} can only be produced from scratch via {@link #eval}, + * {@link #executeAsync}, {@link #immediateSuccessfulFuture}, {@link #immediateFailedFuture}, and + * {@link #immediateCancelledFuture}. + * + * <p>Combined with the helper methods like {@link #whenSucceedsCall} below, this is very similar + * to Guava's {@link ListenableFuture}. + * + * <p>This class is deliberately opaque; the only ways to compose/use {@link #QueryTaskFuture} + * instances are the helper methods like {@link #whenSucceedsCall} below. A crucial consequence of + * this is there is no way for a {@link QueryExpression} or {@link QueryFunction} implementation + * to block on the result of a {@link #QueryTaskFuture}. This eliminates a large class of + * deadlocks by design! + */ + @ThreadSafe + public abstract class QueryTaskFuture<T> { + // We use a public abstract class with a private constructor so that this type is visible to all + // the query codebase, but yet the only possible implementation is under our control in this + // file. + private QueryTaskFuture() {} + + /** + * If this {@link QueryTasksFuture}'s encapsulated computation is currently complete and + * successful, returns the result. This method is intended to be used in combination with + * {@link #whenSucceedsCall}. + * + * <p>See the javadoc for the various helper methods that produce {@link QueryTasksFuture} for + * the precise definition of "successful". + */ + public abstract T getIfSuccessful(); + } + + /** + * Returns a {@link QueryTaskFuture} representing the successful computation of {@code value}. + * + * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of + * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and + * {@link QueryTaskFuture#getIfSuccessful}. + */ + abstract <R> QueryTaskFuture<R> immediateSuccessfulFuture(R value); + + /** + * Returns a {@link QueryTaskFuture} representing a computation that was unsuccessful because of + * {@code e}. + * + * <p>The returned {@link QueryTaskFuture} is considered "unsuccessful" for purposes of + * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and + * {@link QueryTaskFuture#getIfSuccessful}. + */ + abstract <R> QueryTaskFuture<R> immediateFailedFuture(QueryException e); + + /** + * Returns a {@link QueryTaskFuture} representing a cancelled computation. + * + * <p>The returned {@link QueryTaskFuture} is considered "unsuccessful" for purposes of + * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and + * {@link QueryTaskFuture#getIfSuccessful}. + */ + abstract <R> QueryTaskFuture<R> immediateCancelledFuture(); + + /** A {@link ThreadSafe} {@link Callable} for computations during query evaluation. */ + @ThreadSafe + public interface QueryTaskCallable<T> extends Callable<T> { + /** + * Returns the computed value or throws a {@link QueryException} on failure or a + * {@link InterruptedException} on interruption. + */ + @Override + T call() throws QueryException, InterruptedException; + } + + /** + * Returns a {@link QueryTaskFuture} representing the given computation {@code callable} being + * performed asynchronously. + * + * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of + * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and + * {@link QueryTaskFuture#getIfSuccessful} iff {@code callable#call} does not throw an exception. + */ + <R> QueryTaskFuture<R> executeAsync(QueryTaskCallable<R> callable); + + /** + * Returns a {@link QueryTaskFuture} representing the given computation {@code callable} being + * performed after the successful completion of the computation encapsulated by the given + * {@code future} has completed successfully. + * + * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of + * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and + * {@link QueryTaskFuture#getIfSuccessful} iff {@code future} is successful and + * {@code callable#call} does not throw an exception. + */ + <R> QueryTaskFuture<R> whenSucceedsCall(QueryTaskFuture<?> future, QueryTaskCallable<R> callable); + + /** + * Returns a {@link QueryTaskFuture} representing the successful completion of all the + * computations encapsulated by the given {@code futures}. + * + * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of + * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and + * {@link QueryTaskFuture#getIfSuccessful} iff all of the given computations are "successful". + */ + QueryTaskFuture<Void> whenAllSucceed(Iterable<? extends QueryTaskFuture<?>> futures); + + /** + * Returns a {@link QueryTaskFuture} representing the given computation {@code callable} being + * performed after the successful completion of all the computations encapsulated by the given + * {@code futures}. + * + * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of + * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and + * {@link QueryTaskFuture#getIfSuccessful} iff all of the given computations are "successful" and + * {@code callable#call} does not throw an exception. + */ + <R> QueryTaskFuture<R> whenAllSucceedCall( + Iterable<? extends QueryTaskFuture<?>> futures, QueryTaskCallable<R> callable); + + /** + * Returns a {@link QueryTaskFuture} representing the asynchronous application of the given + * {@code function} to the value produced by the computation encapsulated by the given + * {@code future}. + * + * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of + * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and + * {@link QueryTaskFuture#getIfSuccessful} iff {@code} future is "successful". + */ + <T1, T2> QueryTaskFuture<T2> transformAsync( + QueryTaskFuture<T1> future, Function<T1, QueryTaskFuture<T2>> function); + + /** + * The sole package-protected subclass of {@link QueryTaskFuture}. + * + * <p>Do not subclass this class; it's an implementation detail. {@link QueryExpression} and + * {@link QueryFunction} implementations should use {@link #eval} and {@link #executeAsync} to get + * access to {@link QueryTaskFuture} instances and the then use the helper methods like + * {@link #whenSucceedsCall} to transform them. + */ + abstract class QueryTaskFutureImplBase<T> extends QueryTaskFuture<T> { + protected QueryTaskFutureImplBase() { + } + } /** * Creates a Uniquifier for use in a {@code QueryExpression}. Note that the usage of this an @@ -372,9 +494,6 @@ public interface QueryEnvironment<T> { Set<QueryVisibility<T>> getVisibility(T from) throws QueryException, InterruptedException; } - /** Returns the {@link QueryExpressionEvalListener} that this {@link QueryEnvironment} uses. */ - QueryExpressionEvalListener<T> getEvalListener(); - /** List of the default query functions. */ List<QueryFunction> DEFAULT_QUERY_FUNCTIONS = ImmutableList.of( diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java index e35e9e4807..920722db4b 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java @@ -14,9 +14,8 @@ package com.google.devtools.build.lib.query2.engine; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; - +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import java.util.Collection; -import java.util.concurrent.ForkJoinPool; /** * Base class for expressions in the Blaze query language, revision 2. @@ -59,9 +58,9 @@ public abstract class QueryExpression { protected QueryExpression() {} /** - * Evaluates this query in the specified environment, and notifies the callback with a result. - * Note that it is allowed to notify the callback with partial results instead of just one final - * result. + * Returns a {@link QueryTaskFuture} representing the asynchronous evaluation of this query in the + * specified environment, notifying the callback with a result. Note that it is allowed to notify + * the callback with partial results instead of just one final result. * * <p>Failures resulting from evaluation of an ill-formed query cause * QueryException to be thrown. @@ -71,45 +70,10 @@ public abstract class QueryExpression { * thrown. If disabled, evaluation will stumble on to produce a (possibly * inaccurate) result, but a result nonetheless. */ - public final <T> void eval( - QueryEnvironment<T> env, - VariableContext<T> context, - Callback<T> callback) throws QueryException, InterruptedException { - env.getEvalListener().onEval(this, env, context, callback); - evalImpl(env, context, callback); - } - - protected abstract <T> void evalImpl( - QueryEnvironment<T> env, - VariableContext<T> context, - Callback<T> callback) throws QueryException, InterruptedException; - - /** - * Evaluates this query in the specified environment, as in - * {@link #eval(QueryEnvironment, VariableContext, Callback)}, using {@code forkJoinPool} to - * achieve parallelism. - * - * <p>The caller must ensure that {@code env} is thread safe. - */ - @ThreadSafe - public final <T> void parEval( - QueryEnvironment<T> env, - VariableContext<T> context, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) - throws QueryException, InterruptedException { - env.getEvalListener().onParEval(this, env, context, callback, forkJoinPool); - parEvalImpl(env, context, callback, forkJoinPool); - } - - protected <T> void parEvalImpl( + public abstract <T> QueryTaskFuture<Void> eval( QueryEnvironment<T> env, VariableContext<T> context, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) - throws QueryException, InterruptedException { - evalImpl(env, context, callback); - } + Callback<T> callback); /** * Collects all target patterns that are referenced anywhere within this query expression and adds diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java deleted file mode 100644 index e6bdaef7a9..0000000000 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.query2.engine; - -import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import java.util.concurrent.ForkJoinPool; - -/** Listener for calls to the internal methods of {@link QueryExpression} used for evaluation. */ -@ThreadSafe -public interface QueryExpressionEvalListener<T> { - /** Called right before {@link QueryExpression#evalImpl} is called. */ - void onEval( - QueryExpression expr, - QueryEnvironment<T> env, - VariableContext<T> context, - Callback<T> callback); - - /** Called right before {@link QueryExpression#parEvalImpl} is called. */ - void onParEval( - QueryExpression expr, - QueryEnvironment<T> env, - VariableContext<T> context, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool); - - /** A {@link QueryExpressionEvalListener} that does nothing. */ - class NullListener<T> implements QueryExpressionEvalListener<T> { - private static final NullListener<?> INSTANCE = new NullListener<>(); - - private NullListener() { - } - - @SuppressWarnings("unchecked") - public static <T> NullListener<T> instance() { - return (NullListener<T>) INSTANCE; - } - - @Override - public void onEval( - QueryExpression expr, - QueryEnvironment<T> env, - VariableContext<T> context, - Callback<T> callback) { - } - - @Override - public void onParEval( - QueryExpression expr, - QueryEnvironment<T> env, - VariableContext<T> context, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) { - } - } -} - diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java index 2d7be2ed74..afb4192128 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java @@ -13,11 +13,13 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; - import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.MapMaker; +import com.google.common.collect.Sets; import com.google.devtools.build.lib.collect.CompactHashSet; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import java.util.Collections; import java.util.Set; @@ -28,17 +30,18 @@ public final class QueryUtil { /** A {@link Callback} that can aggregate all the partial results into one set. */ public interface AggregateAllCallback<T> extends Callback<T> { + /** Returns a (mutable) set of all the results. */ Set<T> getResult(); } - /** A {@link OutputFormatterCallback} that can aggregate all the partial results into one set. */ + /** A {@link OutputFormatterCallback} that is also a {@link AggregateAllCallback}. */ public abstract static class AggregateAllOutputFormatterCallback<T> - extends OutputFormatterCallback<T> implements AggregateAllCallback<T> { + extends ThreadSafeOutputFormatterCallback<T> implements AggregateAllCallback<T> { } private static class AggregateAllOutputFormatterCallbackImpl<T> extends AggregateAllOutputFormatterCallback<T> { - private final Set<T> result = CompactHashSet.create(); + private final Set<T> result = Sets.newConcurrentHashSet(); @Override public final void processOutput(Iterable<T> partialResult) { @@ -51,65 +54,64 @@ public final class QueryUtil { } } + private static class OrderedAggregateAllOutputFormatterCallbackImpl<T> + extends AggregateAllOutputFormatterCallback<T> { + private final Set<T> result = CompactHashSet.create(); + + @Override + public final synchronized void processOutput(Iterable<T> partialResult) { + Iterables.addAll(result, partialResult); + } + + @Override + public synchronized Set<T> getResult() { + return result; + } + } + /** - * Returns a fresh {@link AggregateAllOutputFormatterCallback} that can aggregate all the partial - * results into one set. - * - * <p>Intended to be used by top-level evaluation of {@link QueryExpression}s; contrast with - * {@link #newAggregateAllCallback}. + * Returns a fresh {@link AggregateAllOutputFormatterCallback} instance whose + * {@link AggregateAllCallback#getResult} returns all the elements of the result in the order they + * were processed. */ public static <T> AggregateAllOutputFormatterCallback<T> - newAggregateAllOutputFormatterCallback() { - return new AggregateAllOutputFormatterCallbackImpl<>(); + newOrderedAggregateAllOutputFormatterCallback() { + return new OrderedAggregateAllOutputFormatterCallbackImpl<>(); } - /** - * Returns a fresh {@link AggregateAllCallback}. - * - * <p>Intended to be used by {@link QueryExpression} implementations; contrast with - * {@link #newAggregateAllOutputFormatterCallback}. - */ + /** Returns a fresh {@link AggregateAllCallback} instance. */ public static <T> AggregateAllCallback<T> newAggregateAllCallback() { return new AggregateAllOutputFormatterCallbackImpl<>(); } /** - * Fully evaluate a {@code QueryExpression} and return a set with all the results. + * Returns a {@link QueryTaskFuture} representing the evaluation of {@code expr} as a (mutable) + * {@link Set} comprised of all the results. * * <p>Should only be used by QueryExpressions when it is the only way of achieving correctness. */ - public static <T> Set<T> evalAll( - QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expr) - throws QueryException, InterruptedException { - AggregateAllCallback<T> callback = newAggregateAllCallback(); - env.eval(expr, context, callback); - return callback.getResult(); + public static <T> QueryTaskFuture<Set<T>> evalAll( + QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expr) { + final AggregateAllCallback<T> callback = newAggregateAllCallback(); + return env.whenSucceedsCall( + env.eval(expr, context, callback), + new QueryTaskCallable<Set<T>>() { + @Override + public Set<T> call() { + return callback.getResult(); + } + }); } /** A trivial {@link Uniquifier} base class. */ - public abstract static class AbstractUniquifier<T, K> - extends AbstractUniquifierBase<T, K> { - private final CompactHashSet<K> alreadySeen = CompactHashSet.create(); + public abstract static class AbstractUniquifier<T, K> implements Uniquifier<T> { + private final Set<K> alreadySeen; - @Override - public final boolean unique(T element) { - return alreadySeen.add(extractKey(element)); + protected AbstractUniquifier() { + this(/*concurrencyLevel=*/ 1); } - /** - * Extracts an unique key that can be used to dedupe the given {@code element}. - * - * <p>Depending on the choice of {@code K}, this enables potential memory optimizations. - */ - protected abstract K extractKey(T element); - } - - /** A trivial {@link ThreadSafeUniquifier} base class. */ - public abstract static class AbstractThreadSafeUniquifier<T, K> - extends AbstractUniquifierBase<T, K> implements ThreadSafeUniquifier<T> { - private final Set<K> alreadySeen; - - protected AbstractThreadSafeUniquifier(int concurrencyLevel) { + protected AbstractUniquifier(int concurrencyLevel) { this.alreadySeen = Collections.newSetFromMap( new MapMaker().concurrencyLevel(concurrencyLevel).<K, Boolean>makeMap()); } @@ -119,15 +121,6 @@ public final class QueryUtil { return alreadySeen.add(extractKey(element)); } - /** - * Extracts an unique key that can be used to dedupe the given {@code element}. - * - * <p>Depending on the choice of {@code K}, this enables potential memory optimizations. - */ - protected abstract K extractKey(T element); - } - - private abstract static class AbstractUniquifierBase<T, K> implements Uniquifier<T> { @Override public final ImmutableList<T> unique(Iterable<T> newElements) { ImmutableList.Builder<T> result = ImmutableList.builder(); @@ -138,5 +131,12 @@ public final class QueryUtil { } return result.build(); } + + /** + * Extracts an unique key that can be used to dedupe the given {@code element}. + * + * <p>Depending on the choice of {@code K}, this enables potential memory optimizations. + */ + protected abstract K extractKey(T element); } -} +}
\ No newline at end of file diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java index 7d691c0b04..82faf72538 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java @@ -13,12 +13,14 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; +import com.google.common.base.Function; +import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; - +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import java.util.List; import java.util.Set; @@ -54,16 +56,31 @@ public final class RdepsFunction extends AllRdepsFunction { * towards the universe while staying within the transitive closure. */ @Override - public <T> void eval(QueryEnvironment<T> env, - VariableContext<T> context, - QueryExpression expression, - List<Argument> args, Callback<T> callback) - throws QueryException, - InterruptedException { - Set<T> universeValue = QueryUtil.evalAll(env, context, args.get(0).getExpression()); - env.buildTransitiveClosure(expression, universeValue, Integer.MAX_VALUE); - - Predicate<T> universe = Predicates.in(env.getTransitiveClosure(universeValue)); - eval(env, context, args.subList(1, args.size()), callback, universe); + public <T> QueryTaskFuture<Void> eval( + final QueryEnvironment<T> env, + final VariableContext<T> context, + final QueryExpression expression, + final List<Argument> args, + final Callback<T> callback) { + QueryTaskFuture<Set<T>> universeValueFuture = + QueryUtil.evalAll(env, context, args.get(0).getExpression()); + Function<Set<T>, QueryTaskFuture<Void>> evalInUniverseAsyncFunction = + new Function<Set<T>, QueryTaskFuture<Void>>() { + @Override + public QueryTaskFuture<Void> apply(Set<T> universeValue) { + Predicate<T> universe; + try { + env.buildTransitiveClosure(expression, universeValue, Integer.MAX_VALUE); + universe = Predicates.in(env.getTransitiveClosure(universeValue)); + } catch (InterruptedException e) { + return env.immediateCancelledFuture(); + } catch (QueryException e) { + return env.immediateFailedFuture(e); + } + return RdepsFunction.this.eval( + env, context, args.subList(1, args.size()), callback, Optional.of(universe)); + } + }; + return env.transformAsync(universeValueFuture, evalInUniverseAsyncFunction); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java index 9dc75a43e1..6b182ee7b1 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java @@ -18,9 +18,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; - +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import java.util.List; -import java.util.concurrent.ForkJoinPool; import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; @@ -33,25 +32,24 @@ public abstract class RegexFilterExpression implements QueryFunction { } @Override - public <T> void eval( + public <T> QueryTaskFuture<Void> eval( final QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expression, final List<Argument> args, - Callback<T> callback) - throws QueryException, InterruptedException { + Callback<T> callback) { String rawPattern = getPattern(args); final Pattern compiledPattern; try { compiledPattern = Pattern.compile(rawPattern); } catch (PatternSyntaxException e) { - throw new QueryException( + return env.immediateFailedFuture(new QueryException( expression, String.format( "illegal '%s' pattern regexp '%s': %s", getName(), rawPattern, - e.getMessage())); + e.getMessage()))); } // Note that Patttern#matcher is thread-safe and so this Predicate can safely be used @@ -68,21 +66,10 @@ public abstract class RegexFilterExpression implements QueryFunction { } }; - env.eval( + return env.eval( Iterables.getLast(args).getExpression(), context, - filteredCallback(callback, matchFilter)); - } - - @Override - public <T> void parEval( - QueryEnvironment<T> env, - VariableContext<T> context, - QueryExpression expression, - List<Argument> args, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - eval(env, context, expression, args, callback); + new FilteredCallback<>(callback, matchFilter)); } /** @@ -111,21 +98,6 @@ public abstract class RegexFilterExpression implements QueryFunction { protected abstract String getPattern(List<Argument> args); - /** - * Returns a new {@link Callback} that forwards values that satisfies the given {@link Predicate} - * to the given {@code parentCallback}. - * - * <p>The returned {@link Callback} will be a {@link ThreadSafeCallback} iff - * {@code parentCallback} is as well. - */ - private static <T> Callback<T> filteredCallback( - final Callback<T> parentCallback, - final Predicate<T> retainIfTrue) { - return (parentCallback instanceof ThreadSafeCallback) - ? new ThreadSafeFilteredCallback<>((ThreadSafeCallback<T>) parentCallback, retainIfTrue) - : new FilteredCallback<>(parentCallback, retainIfTrue); - } - private static class FilteredCallback<T> implements Callback<T> { private final Callback<T> parentCallback; private final Predicate<T> retainIfTrue; @@ -148,12 +120,4 @@ public abstract class RegexFilterExpression implements QueryFunction { return "filtered parentCallback of : " + retainIfTrue; } } - - private static class ThreadSafeFilteredCallback<T> - extends FilteredCallback<T> implements ThreadSafeCallback<T> { - private ThreadSafeFilteredCallback( - ThreadSafeCallback<T> parentCallback, Predicate<T> retainIfTrue) { - super(parentCallback, retainIfTrue); - } - } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java index ac4b460f40..e1eadf3aa5 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java @@ -14,7 +14,8 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.base.Joiner; - +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; +import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -46,12 +47,13 @@ class SetExpression extends QueryExpression { } @Override - protected <T> void evalImpl( - QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) - throws QueryException, InterruptedException { + public <T> QueryTaskFuture<Void> eval( + QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) { + ArrayList<QueryTaskFuture<Void>> queryTasks = new ArrayList<>(words.size()); for (TargetLiteral expr : words) { - env.eval(expr, context, callback); + queryTasks.add(env.eval(expr, context, callback)); } + return env.whenAllSucceed(queryTasks); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java index 8dc0442468..4b07a99f07 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java @@ -19,8 +19,9 @@ import com.google.common.collect.Iterables; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import java.util.List; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -49,36 +50,37 @@ class SomeFunction implements QueryFunction { } @Override - public <T> void eval( + public <T> QueryTaskFuture<Void> eval( QueryEnvironment<T> env, VariableContext<T> context, - QueryExpression expression, + final QueryExpression expression, List<Argument> args, - final Callback<T> callback) throws QueryException, InterruptedException { + final Callback<T> callback) { final AtomicBoolean someFound = new AtomicBoolean(false); - env.eval(args.get(0).getExpression(), context, new Callback<T>() { - @Override - public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { - if (someFound.get() || Iterables.isEmpty(partialResult)) { - return; - } - callback.process(ImmutableSet.of(partialResult.iterator().next())); - someFound.set(true); - } - }); - if (!someFound.get()) { - throw new QueryException(expression, "argument set is empty"); - } - } - - @Override - public <T> void parEval( - QueryEnvironment<T> env, - VariableContext<T> context, - QueryExpression expression, - List<Argument> args, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - eval(env, context, expression, args, callback); + QueryTaskFuture<Void> operandEvalFuture = env.eval( + args.get(0).getExpression(), + context, + new Callback<T>() { + @Override + public void process(Iterable<T> partialResult) + throws QueryException, InterruptedException { + if (someFound.get() || Iterables.isEmpty(partialResult)) { + return; + } + callback.process(ImmutableSet.of(partialResult.iterator().next())); + someFound.set(true); + } + }); + return env.whenSucceedsCall( + operandEvalFuture, + new QueryTaskCallable<Void>() { + @Override + public Void call() throws QueryException { + if (!someFound.get()) { + throw new QueryException(expression, "argument set is empty"); + } + return null; + } + }); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java index 2d0df0ef59..229863c79a 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java @@ -20,10 +20,10 @@ import com.google.common.collect.Sets.SetView; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; - +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import java.util.List; import java.util.Set; -import java.util.concurrent.ForkJoinPool; /** * A somepath(x, y) query expression, which computes the set of nodes @@ -51,50 +51,52 @@ class SomePathFunction implements QueryFunction { } @Override - public <T> void eval( - QueryEnvironment<T> env, + public <T> QueryTaskFuture<Void> eval( + final QueryEnvironment<T> env, VariableContext<T> context, - QueryExpression expression, + final QueryExpression expression, List<Argument> args, - final Callback<T> callback) throws QueryException, InterruptedException { - Set<T> fromValue = QueryUtil.evalAll(env, context, args.get(0).getExpression()); - Set<T> toValue = QueryUtil.evalAll(env, context, args.get(1).getExpression()); + final Callback<T> callback) { + final QueryTaskFuture<Set<T>> fromValueFuture = + QueryUtil.evalAll(env, context, args.get(0).getExpression()); + final QueryTaskFuture<Set<T>> toValueFuture = + QueryUtil.evalAll(env, context, args.get(1).getExpression()); - // Implementation strategy: for each x in "from", compute its forward - // transitive closure. If it intersects "to", then do a path search from x - // to an arbitrary node in the intersection, and return the path. This - // avoids computing the full transitive closure of "from" in some cases. + return env.whenAllSucceedCall( + ImmutableList.of(fromValueFuture, toValueFuture), + new QueryTaskCallable<Void>() { + @Override + public Void call() throws QueryException, InterruptedException { + // Implementation strategy: for each x in "from", compute its forward + // transitive closure. If it intersects "to", then do a path search from x + // to an arbitrary node in the intersection, and return the path. This + // avoids computing the full transitive closure of "from" in some cases. - env.buildTransitiveClosure(expression, fromValue, Integer.MAX_VALUE); + Set<T> fromValue = fromValueFuture.getIfSuccessful(); + Set<T> toValue = toValueFuture.getIfSuccessful(); - // This set contains all nodes whose TC does not intersect "toValue". - Uniquifier<T> uniquifier = env.createUniquifier(); + env.buildTransitiveClosure(expression, fromValue, Integer.MAX_VALUE); - for (T x : uniquifier.unique(fromValue)) { - Set<T> xtc = env.getTransitiveClosure(ImmutableSet.of(x)); - SetView<T> result; - if (xtc.size() > toValue.size()) { - result = Sets.intersection(toValue, xtc); - } else { - result = Sets.intersection(xtc, toValue); - } - if (!result.isEmpty()) { - callback.process(env.getNodesOnPath(x, result.iterator().next())); - return; - } - uniquifier.unique(xtc); - } - callback.process(ImmutableSet.<T>of()); - } + // This set contains all nodes whose TC does not intersect "toValue". + Uniquifier<T> uniquifier = env.createUniquifier(); - @Override - public <T> void parEval( - QueryEnvironment<T> env, - VariableContext<T> context, - QueryExpression expression, - List<Argument> args, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - eval(env, context, expression, args, callback); + for (T x : uniquifier.unique(fromValue)) { + Set<T> xtc = env.getTransitiveClosure(ImmutableSet.of(x)); + SetView<T> result; + if (xtc.size() > toValue.size()) { + result = Sets.intersection(toValue, xtc); + } else { + result = Sets.intersection(xtc, toValue); + } + if (!result.isEmpty()) { + callback.process(env.getNodesOnPath(x, result.iterator().next())); + return null; + } + uniquifier.unique(xtc); + } + callback.process(ImmutableSet.<T>of()); + return null; + } + }); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java index eda505a7ce..bb67e93ad5 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java @@ -14,7 +14,6 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.base.Predicate; -import java.util.concurrent.ForkJoinPool; /** * The environment of a Blaze query which supports predefined streaming operations. @@ -24,22 +23,19 @@ import java.util.concurrent.ForkJoinPool; public interface StreamableQueryEnvironment<T> extends QueryEnvironment<T> { /** Retrieve and process all reverse dependencies of given expression in a streaming manner. */ - void getAllRdeps( + QueryTaskFuture<Void> getAllRdeps( QueryExpression expression, Predicate<T> universe, VariableContext<T> context, Callback<T> callback, - int depth) - throws QueryException, InterruptedException; + int depth); /** * Similar to {@link #getAllRdeps} but finds all rdeps without a depth bound, making use of the * provided {@code forkJoinPool}. */ - void getAllRdepsUnboundedParallel( + QueryTaskFuture<Void> getAllRdepsUnboundedParallel( QueryExpression expression, VariableContext<T> context, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) - throws QueryException, InterruptedException; + Callback<T> callback); } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SynchronizedDelegatingOutputFormatterCallback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SynchronizedDelegatingOutputFormatterCallback.java new file mode 100644 index 0000000000..68a79338b7 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SynchronizedDelegatingOutputFormatterCallback.java @@ -0,0 +1,58 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.query2.engine; + +import java.io.IOException; +import javax.annotation.Nullable; + +/** + * A {@link ThreadSafeOutputFormatterCallback} wrapper around a {@link OutputFormatterCallback} + * delegate. + */ +public final class SynchronizedDelegatingOutputFormatterCallback<T> + extends ThreadSafeOutputFormatterCallback<T> { + private final OutputFormatterCallback<T> delegate; + + public SynchronizedDelegatingOutputFormatterCallback(OutputFormatterCallback<T> delegate) { + this.delegate = delegate; + } + + @Override + public synchronized void start() throws IOException { + delegate.start(); + } + + @Override + public synchronized void close(boolean failFast) throws InterruptedException, IOException { + delegate.close(failFast); + } + + @Override + public synchronized void process(Iterable<T> partialResult) + throws QueryException, InterruptedException { + delegate.process(partialResult); + } + + @Override + public synchronized void processOutput(Iterable<T> partialResult) + throws IOException, InterruptedException { + delegate.processOutput(partialResult); + } + + @Override + @Nullable + public IOException getIoException() { + return delegate.getIoException(); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java b/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java index aeace9aa70..733bffb065 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java @@ -13,11 +13,10 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import com.google.devtools.build.lib.util.Preconditions; - import java.util.Collection; import java.util.Set; -import java.util.concurrent.ForkJoinPool; /** * A literal set of targets, using 'blaze build' syntax. Or, a reference to a @@ -45,38 +44,31 @@ public final class TargetLiteral extends QueryExpression { return LetExpression.isValidVarReference(pattern); } - private <T> void evalVarReference(VariableContext<T> context, Callback<T> callback) - throws QueryException, InterruptedException { + private <T> QueryTaskFuture<Void> evalVarReference( + QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) { String varName = LetExpression.getNameFromReference(pattern); Set<T> value = context.get(varName); if (value == null) { - throw new QueryException(this, "undefined variable '" + varName + "'"); + return env.immediateFailedFuture( + new QueryException(this, "undefined variable '" + varName + "'")); } - callback.process(value); - } - - @Override - protected <T> void evalImpl( - QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) - throws QueryException, InterruptedException { - if (isVariableReference()) { - evalVarReference(context, callback); - } else { - env.getTargetsMatchingPattern(this, pattern, callback); + try { + callback.process(value); + return env.immediateSuccessfulFuture(null); + } catch (QueryException e) { + return env.immediateFailedFuture(e); + } catch (InterruptedException e) { + return env.immediateCancelledFuture(); } } @Override - protected <T> void parEvalImpl( - QueryEnvironment<T> env, - VariableContext<T> context, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) - throws QueryException, InterruptedException { + public <T> QueryTaskFuture<Void> eval( + QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) { if (isVariableReference()) { - evalVarReference(context, callback); + return evalVarReference(env, context, callback); } else { - env.getTargetsMatchingPatternPar(this, pattern, callback, forkJoinPool); + return env.getTargetsMatchingPattern(this, pattern, callback); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java index 956e604a44..d9ed576a5d 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java @@ -15,9 +15,11 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting; import java.util.ArrayList; import java.util.Collection; @@ -27,7 +29,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ForkJoinPool; /** * A tests(x) filter expression, which returns all the tests in set x, @@ -62,15 +63,15 @@ class TestsFunction implements QueryFunction { } @Override - public <T> void eval( + public <T> QueryTaskFuture<Void> eval( final QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expression, List<Argument> args, - final Callback<T> callback) throws QueryException, InterruptedException { + final Callback<T> callback) { final Closure<T> closure = new Closure<>(expression, env); - env.eval(args.get(0).getExpression(), context, new Callback<T>() { + return env.eval(args.get(0).getExpression(), context, new Callback<T>() { @Override public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { for (T target : partialResult) { @@ -86,17 +87,6 @@ class TestsFunction implements QueryFunction { }); } - @Override - public <T> void parEval( - QueryEnvironment<T> env, - VariableContext<T> context, - QueryExpression expression, - List<Argument> args, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - eval(env, context, expression, args, callback); - } - /** * Decides whether to include a test in a test_suite or not. * @param testTags Collection of all tags exhibited by a given test. @@ -151,10 +141,8 @@ class TestsFunction implements QueryFunction { } } - /** - * A closure over the temporary state needed to compute the expression. This makes the evaluation - * thread-safe, as long as instances of this class are used only within a single thread. - */ + /** A closure over the temporary state needed to compute the expression. */ + @ThreadSafe private static final class Closure<T> { private final QueryExpression expression; /** A dynamically-populated mapping from test_suite rules to their tests. */ @@ -177,7 +165,8 @@ class TestsFunction implements QueryFunction { * * @precondition env.getAccessor().isTestSuite(testSuite) */ - private Set<T> getTestsInSuite(T testSuite) throws QueryException, InterruptedException { + private synchronized Set<T> getTestsInSuite(T testSuite) + throws QueryException, InterruptedException { Set<T> tests = testsInSuite.get(testSuite); if (tests == null) { tests = Sets.newHashSet(); diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java deleted file mode 100644 index 950335e38a..0000000000 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java +++ /dev/null @@ -1,23 +0,0 @@ -// Copyright 2014 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.query2.engine; - -import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import com.google.devtools.build.lib.util.ThreadSafeBatchCallback; - -/** Marker interface for a {@link Callback} that is {@link ThreadSafe}. */ -@ThreadSafe -public interface ThreadSafeCallback<T> - extends Callback<T>, ThreadSafeBatchCallback<T, QueryException> { -} diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeUniquifier.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeOutputFormatterCallback.java index 747185582f..bc3eb59f84 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeUniquifier.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeOutputFormatterCallback.java @@ -1,4 +1,4 @@ -// Copyright 2016 The Bazel Authors. All rights reserved. +// Copyright 2017 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,8 +15,7 @@ package com.google.devtools.build.lib.query2.engine; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -/** Marker interface for a {@link ThreadSafe} {@link Uniquifier}. */ +/** A marker parent class for a {@link ThreadSafe} {@link OutputFormatterCallback}. */ @ThreadSafe -public interface ThreadSafeUniquifier<T> extends Uniquifier<T> { -} - +public abstract class ThreadSafeOutputFormatterCallback<T> extends OutputFormatterCallback<T> { +}
\ No newline at end of file diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java b/src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java index ed2b2376fa..5f8faf56b1 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java @@ -14,8 +14,10 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.collect.ImmutableList; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; /** A helper for deduping values. */ +@ThreadSafe public interface Uniquifier<T> { /** Returns whether {@code newElement} has been seen before. */ boolean unique(T newElement); diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java index 532f331378..b09910c715 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java @@ -14,13 +14,14 @@ package com.google.devtools.build.lib.query2.engine; +import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import java.util.List; import java.util.Set; -import java.util.concurrent.ForkJoinPool; /** * A visible(x, y) query expression, which computes the subset of nodes in y @@ -52,34 +53,32 @@ public class VisibleFunction implements QueryFunction { } @Override - public <T> void eval( + public <T> QueryTaskFuture<Void> eval( final QueryEnvironment<T> env, - VariableContext<T> context, + final VariableContext<T> context, QueryExpression expression, - List<Argument> args, - final Callback<T> callback) throws QueryException, InterruptedException { - final Set<T> toSet = QueryUtil.evalAll(env, context, args.get(0).getExpression()); - env.eval(args.get(1).getExpression(), context, new Callback<T>() { - @Override - public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { - for (T t : partialResult) { - if (visibleToAll(env, toSet, t)) { - callback.process(ImmutableList.of(t)); + final List<Argument> args, + final Callback<T> callback) { + final QueryTaskFuture<Set<T>> toSetFuture = + QueryUtil.evalAll(env, context, args.get(0).getExpression()); + Function<Set<T>, QueryTaskFuture<Void>> computeVisibleNodesAsyncFunction = + new Function<Set<T>, QueryTaskFuture<Void>>() { + @Override + public QueryTaskFuture<Void> apply(final Set<T> toSet) { + return env.eval(args.get(1).getExpression(), context, new Callback<T>() { + @Override + public void process(Iterable<T> partialResult) + throws QueryException, InterruptedException { + for (T t : partialResult) { + if (visibleToAll(env, toSet, t)) { + callback.process(ImmutableList.of(t)); + } + } + } + }); } - } - } - }); - } - - @Override - public <T> void parEval( - QueryEnvironment<T> env, - VariableContext<T> context, - QueryExpression expression, - List<Argument> args, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - eval(env, context, expression, args, callback); + }; + return env.transformAsync(toSetFuture, computeVisibleNodesAsyncFunction); } /** Returns true if {@code target} is visible to all targets in {@code toSet}. */ diff --git a/src/main/java/com/google/devtools/build/lib/query2/output/OutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/output/OutputFormatter.java index 18890c3eb6..5c314b6a5c 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/output/OutputFormatter.java +++ b/src/main/java/com/google/devtools/build/lib/query2/output/OutputFormatter.java @@ -35,6 +35,8 @@ import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.packages.TriState; import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback; import com.google.devtools.build.lib.query2.engine.QueryEnvironment; +import com.google.devtools.build.lib.query2.engine.SynchronizedDelegatingOutputFormatterCallback; +import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback; import com.google.devtools.build.lib.query2.output.QueryOptions.OrderOutput; import com.google.devtools.build.lib.syntax.EvalUtils; import com.google.devtools.build.lib.syntax.Printer; @@ -178,15 +180,16 @@ public abstract class OutputFormatter implements Serializable { void setOptions(QueryOptions options, AspectResolver aspectResolver); /** - * Returns a {@link OutputFormatterCallback} whose {@link OutputFormatterCallback#process} - * outputs formatted {@link Target}s to the given {@code out}. + * Returns a {@link ThreadSafeOutputFormatterCallback} whose + * {@link OutputFormatterCallback#process} outputs formatted {@link Target}s to the given + * {@code out}. * * <p>Takes any options specified via the most recent call to {@link #setOptions} into * consideration. * * <p>Intended to be use for streaming out during evaluation of a query. */ - OutputFormatterCallback<Target> createStreamCallback( + ThreadSafeOutputFormatterCallback<Target> createStreamCallback( OutputStream out, QueryOptions options, QueryEnvironment<?> env); /** @@ -288,9 +291,10 @@ public abstract class OutputFormatter implements Serializable { } @Override - public OutputFormatterCallback<Target> createStreamCallback( + public ThreadSafeOutputFormatterCallback<Target> createStreamCallback( OutputStream out, QueryOptions options, QueryEnvironment<?> env) { - return createPostFactoStreamCallback(out, options); + return new SynchronizedDelegatingOutputFormatterCallback<>( + createPostFactoStreamCallback(out, options)); } } @@ -345,9 +349,10 @@ public abstract class OutputFormatter implements Serializable { } @Override - public OutputFormatterCallback<Target> createStreamCallback( + public ThreadSafeOutputFormatterCallback<Target> createStreamCallback( OutputStream out, QueryOptions options, QueryEnvironment<?> env) { - return createPostFactoStreamCallback(out, options); + return new SynchronizedDelegatingOutputFormatterCallback<>( + createPostFactoStreamCallback(out, options)); } } @@ -387,9 +392,10 @@ public abstract class OutputFormatter implements Serializable { } @Override - public OutputFormatterCallback<Target> createStreamCallback( + public ThreadSafeOutputFormatterCallback<Target> createStreamCallback( OutputStream out, QueryOptions options, QueryEnvironment<?> env) { - return createPostFactoStreamCallback(out, options); + return new SynchronizedDelegatingOutputFormatterCallback<>( + createPostFactoStreamCallback(out, options)); } } @@ -478,9 +484,10 @@ public abstract class OutputFormatter implements Serializable { } @Override - public OutputFormatterCallback<Target> createStreamCallback( + public ThreadSafeOutputFormatterCallback<Target> createStreamCallback( OutputStream out, QueryOptions options, QueryEnvironment<?> env) { - return createPostFactoStreamCallback(out, options); + return new SynchronizedDelegatingOutputFormatterCallback<>( + createPostFactoStreamCallback(out, options)); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/output/ProtoOutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/output/ProtoOutputFormatter.java index 66e1182c74..f966afb88d 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/output/ProtoOutputFormatter.java +++ b/src/main/java/com/google/devtools/build/lib/query2/output/ProtoOutputFormatter.java @@ -41,6 +41,8 @@ import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.query2.FakeSubincludeTarget; import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback; import com.google.devtools.build.lib.query2.engine.QueryEnvironment; +import com.google.devtools.build.lib.query2.engine.SynchronizedDelegatingOutputFormatterCallback; +import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback; import com.google.devtools.build.lib.query2.output.AspectResolver.BuildFileDependencyMode; import com.google.devtools.build.lib.query2.output.OutputFormatter.AbstractUnorderedFormatter; import com.google.devtools.build.lib.query2.output.QueryOptions.OrderOutput; @@ -50,7 +52,6 @@ import com.google.devtools.build.lib.query2.proto.proto2api.Build.QueryResult.Bu import com.google.devtools.build.lib.query2.proto.proto2api.Build.SourceFile; import com.google.devtools.build.lib.syntax.Environment; import com.google.devtools.build.lib.syntax.Type; - import java.io.IOException; import java.io.OutputStream; import java.util.Collection; @@ -130,9 +131,10 @@ public class ProtoOutputFormatter extends AbstractUnorderedFormatter { } @Override - public OutputFormatterCallback<Target> createStreamCallback( + public ThreadSafeOutputFormatterCallback<Target> createStreamCallback( OutputStream out, QueryOptions options, QueryEnvironment<?> env) { - return createPostFactoStreamCallback(out, options); + return new SynchronizedDelegatingOutputFormatterCallback<>( + createPostFactoStreamCallback(out, options)); } private static Iterable<Target> getSortedLabels(Digraph<Target> result) { diff --git a/src/main/java/com/google/devtools/build/lib/query2/output/XmlOutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/output/XmlOutputFormatter.java index 193ef18cf7..6784ed3917 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/output/XmlOutputFormatter.java +++ b/src/main/java/com/google/devtools/build/lib/query2/output/XmlOutputFormatter.java @@ -29,10 +29,11 @@ import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.query2.FakeSubincludeTarget; import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback; import com.google.devtools.build.lib.query2.engine.QueryEnvironment; +import com.google.devtools.build.lib.query2.engine.SynchronizedDelegatingOutputFormatterCallback; +import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback; import com.google.devtools.build.lib.query2.output.AspectResolver.BuildFileDependencyMode; import com.google.devtools.build.lib.query2.output.OutputFormatter.AbstractUnorderedFormatter; import com.google.devtools.build.lib.syntax.Type; - import java.io.IOException; import java.io.OutputStream; import java.util.Collection; @@ -62,9 +63,10 @@ class XmlOutputFormatter extends AbstractUnorderedFormatter { } @Override - public OutputFormatterCallback<Target> createStreamCallback( + public ThreadSafeOutputFormatterCallback<Target> createStreamCallback( OutputStream out, QueryOptions options, QueryEnvironment<?> env) { - return createPostFactoStreamCallback(out, options); + return new SynchronizedDelegatingOutputFormatterCallback<>( + createPostFactoStreamCallback(out, options)); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java b/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java index 589822b805..e01aa4fcbd 100644 --- a/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java +++ b/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java @@ -55,7 +55,6 @@ import com.google.devtools.build.lib.query2.engine.DigraphQueryEvalResult; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting; import com.google.devtools.build.lib.query2.engine.QueryException; -import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener; import com.google.devtools.build.lib.query2.engine.QueryUtil; import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllOutputFormatterCallback; import com.google.devtools.build.lib.query2.engine.SkyframeRestartQueryException; @@ -284,7 +283,7 @@ public class GenQuery implements RuleConfiguredTargetFactory { DigraphQueryEvalResult<Target> queryResult; OutputFormatter formatter; AggregateAllOutputFormatterCallback<Target> targets = - QueryUtil.newAggregateAllOutputFormatterCallback(); + QueryUtil.newOrderedAggregateAllOutputFormatterCallback(); try { Set<Setting> settings = queryOptions.toSettings(); @@ -318,7 +317,6 @@ public class GenQuery implements RuleConfiguredTargetFactory { getEventHandler(ruleContext), settings, ImmutableList.<QueryFunction>of(), - QueryExpressionEvalListener.NullListener.<Target>instance(), /*packagePath=*/null); queryResult = (DigraphQueryEvalResult<Target>) queryEnvironment.evaluateQuery(query, targets); } catch (SkyframeRestartQueryException e) { diff --git a/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java b/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java index c52a3fe0d9..118f529220 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java @@ -23,14 +23,13 @@ import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.pkgcache.PackageCacheOptions; import com.google.devtools.build.lib.query2.AbstractBlazeQueryEnvironment; -import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting; import com.google.devtools.build.lib.query2.engine.QueryEvalResult; import com.google.devtools.build.lib.query2.engine.QueryException; import com.google.devtools.build.lib.query2.engine.QueryExpression; -import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener; import com.google.devtools.build.lib.query2.engine.QueryUtil; import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllOutputFormatterCallback; +import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback; import com.google.devtools.build.lib.query2.output.OutputFormatter; import com.google.devtools.build.lib.query2.output.OutputFormatter.StreamedFormatter; import com.google.devtools.build.lib.query2.output.QueryOptions; @@ -150,7 +149,7 @@ public final class QueryCommand implements BlazeCommand { expr = queryEnv.transformParsedQuery(expr); OutputStream out = env.getReporter().getOutErr().getOutputStream(); - OutputFormatterCallback<Target> callback; + ThreadSafeOutputFormatterCallback<Target> callback; if (streamResults) { disableAnsiCharactersFiltering(env); @@ -161,7 +160,7 @@ public final class QueryCommand implements BlazeCommand { queryOptions.aspectDeps.createResolver(env.getPackageManager(), env.getReporter())); callback = streamedFormatter.createStreamCallback(out, queryOptions, queryEnv); } else { - callback = QueryUtil.newAggregateAllOutputFormatterCallback(); + callback = QueryUtil.newOrderedAggregateAllOutputFormatterCallback(); } boolean catastrophe = true; try { @@ -207,8 +206,7 @@ public final class QueryCommand implements BlazeCommand { // 3. Output results: try { - Set<Target> targets = - ((AggregateAllOutputFormatterCallback<Target>) callback).getResult(); + Set<Target> targets = ((AggregateAllOutputFormatterCallback<Target>) callback).getResult(); QueryOutputUtils.output( queryOptions, result, @@ -277,7 +275,6 @@ public final class QueryCommand implements BlazeCommand { env.getReporter(), settings, env.getRuntime().getQueryFunctions(), - QueryExpressionEvalListener.NullListener.<Target>instance(), env.getPackageManager().getPackagePath()); } } diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java b/src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java index df6351a513..3b4b768793 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java @@ -37,7 +37,6 @@ import com.google.devtools.build.lib.skyframe.EnvironmentBackedRecursivePackageP import com.google.devtools.build.lib.util.BatchCallback; import com.google.devtools.build.lib.util.BatchCallback.NullCallback; import com.google.devtools.build.lib.util.Preconditions; -import com.google.devtools.build.lib.util.ThreadSafeBatchCallback; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.lib.vfs.RootedPath; @@ -47,7 +46,6 @@ import com.google.devtools.build.skyframe.SkyKey; import com.google.devtools.build.skyframe.SkyValue; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -131,7 +129,7 @@ public class PrepareDepsOfPatternFunction implements SkyFunction { * transitive dependencies. Its methods may throw {@link MissingDepException} if the package * values this depends on haven't been calculated and added to its environment. */ - static class DepsOfPatternPreparer implements TargetPatternResolver<Void> { + static class DepsOfPatternPreparer extends TargetPatternResolver<Void> { private final EnvironmentBackedRecursivePackageProvider packageProvider; private final Environment env; @@ -230,7 +228,8 @@ public class PrepareDepsOfPatternFunction implements SkyFunction { String directory, boolean rulesOnly, ImmutableSet<PathFragment> excludedSubdirectories, - BatchCallback<Void, E> callback, Class<E> exceptionClass) + BatchCallback<Void, E> callback, + Class<E> exceptionClass) throws TargetParsingException, E, InterruptedException { FilteringPolicy policy = rulesOnly ? FilteringPolicies.RULES_ONLY : FilteringPolicies.NO_FILTER; @@ -261,26 +260,5 @@ public class PrepareDepsOfPatternFunction implements SkyFunction { } } } - - @Override - public <E extends Exception> void findTargetsBeneathDirectoryPar( - RepositoryName repository, - String originalPattern, - String directory, - boolean rulesOnly, - ImmutableSet<PathFragment> excludedSubdirectories, - ThreadSafeBatchCallback<Void, E> callback, - Class<E> exceptionClass, - ForkJoinPool forkJoinPool) - throws TargetParsingException, E, InterruptedException { - findTargetsBeneathDirectory( - repository, - originalPattern, - directory, - rulesOnly, - excludedSubdirectories, - callback, - exceptionClass); - } } } diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java index 5c6bd42ee8..dc84f5b6a1 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java @@ -22,6 +22,9 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.PackageIdentifier; @@ -29,7 +32,6 @@ import com.google.devtools.build.lib.cmdline.RepositoryName; import com.google.devtools.build.lib.cmdline.ResolvedTargets; import com.google.devtools.build.lib.cmdline.TargetParsingException; import com.google.devtools.build.lib.cmdline.TargetPatternResolver; -import com.google.devtools.build.lib.concurrent.MoreFutures; import com.google.devtools.build.lib.concurrent.MultisetSemaphore; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible; import com.google.devtools.build.lib.events.Event; @@ -51,9 +53,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -61,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean; */ @ThreadCompatible public class RecursivePackageProviderBackedTargetPatternResolver - implements TargetPatternResolver<Target> { + extends TargetPatternResolver<Target> { // TODO(janakr): Move this to a more generic place and unify with SkyQueryEnvironment's value? private static final int MAX_PACKAGES_BULK_GET = 1000; @@ -194,56 +193,64 @@ public class RecursivePackageProviderBackedTargetPatternResolver BatchCallback<Target, E> callback, Class<E> exceptionClass) throws TargetParsingException, E, InterruptedException { - findTargetsBeneathDirectoryParImpl( - repository, - originalPattern, - directory, - rulesOnly, - excludedSubdirectories, - new SynchronizedBatchCallback<Target, E>(callback), - exceptionClass, - MoreExecutors.newDirectExecutorService()); + try { + findTargetsBeneathDirectoryAsyncImpl( + repository, + originalPattern, + directory, + rulesOnly, + excludedSubdirectories, + new SynchronizedBatchCallback<Target, E>(callback), + MoreExecutors.newDirectExecutorService()).get(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + Throwables.propagateIfPossible(cause, TargetParsingException.class, exceptionClass); + throw new IllegalStateException(e.getCause()); + } } @Override - public <E extends Exception> void findTargetsBeneathDirectoryPar( - final RepositoryName repository, - final String originalPattern, + public <E extends Exception> ListenableFuture<Void> findTargetsBeneathDirectoryAsync( + RepositoryName repository, + String originalPattern, String directory, boolean rulesOnly, ImmutableSet<PathFragment> excludedSubdirectories, - final ThreadSafeBatchCallback<Target, E> callback, + ThreadSafeBatchCallback<Target, E> callback, Class<E> exceptionClass, - ForkJoinPool forkJoinPool) - throws TargetParsingException, E, InterruptedException { - findTargetsBeneathDirectoryParImpl( + ListeningExecutorService executor) { + return findTargetsBeneathDirectoryAsyncImpl( repository, originalPattern, directory, rulesOnly, excludedSubdirectories, - callback, - exceptionClass, - forkJoinPool); + new SynchronizedBatchCallback<Target, E>(callback), + executor); } - private <E extends Exception> void findTargetsBeneathDirectoryParImpl( + private <E extends Exception> ListenableFuture<Void> findTargetsBeneathDirectoryAsyncImpl( final RepositoryName repository, final String originalPattern, String directory, boolean rulesOnly, ImmutableSet<PathFragment> excludedSubdirectories, final ThreadSafeBatchCallback<Target, E> callback, - Class<E> exceptionClass, - ExecutorService executor) - throws TargetParsingException, E, InterruptedException { + ListeningExecutorService executor) { final FilteringPolicy actualPolicy = rulesOnly ? FilteringPolicies.and(FilteringPolicies.RULES_ONLY, policy) : policy; - PathFragment pathFragment = TargetPatternResolverUtil.getPathFragment(directory); - Iterable<PathFragment> packagesUnderDirectory = - recursivePackageProvider.getPackagesUnderDirectory( - repository, pathFragment, excludedSubdirectories); + final PathFragment pathFragment; + Iterable<PathFragment> packagesUnderDirectory; + try { + pathFragment = TargetPatternResolverUtil.getPathFragment(directory); + packagesUnderDirectory = recursivePackageProvider.getPackagesUnderDirectory( + repository, pathFragment, excludedSubdirectories); + } catch (TargetParsingException e) { + return Futures.immediateFailedFuture(e); + } catch (InterruptedException e) { + return Futures.immediateCancelledFuture(); + } Iterable<PackageIdentifier> pkgIds = Iterables.transform(packagesUnderDirectory, new Function<PathFragment, PackageIdentifier>() { @@ -258,9 +265,9 @@ public class RecursivePackageProviderBackedTargetPatternResolver // into batches. List<List<PackageIdentifier>> partitions = ImmutableList.copyOf(Iterables.partition(pkgIds, MAX_PACKAGES_BULK_GET)); - ArrayList<Future<Void>> tasks = new ArrayList<>(partitions.size()); + ArrayList<ListenableFuture<Void>> futures = new ArrayList<>(partitions.size()); for (final Iterable<PackageIdentifier> pkgIdBatch : partitions) { - tasks.add(executor.submit(new Callable<Void>() { + futures.add(executor.submit(new Callable<Void>() { @Override public Void call() throws E, TargetParsingException, InterruptedException { ImmutableSet<PackageIdentifier> pkgIdBatchSet = ImmutableSet.copyOf(pkgIdBatch); @@ -288,17 +295,15 @@ public class RecursivePackageProviderBackedTargetPatternResolver } })); } - try { - MoreFutures.waitForAllInterruptiblyFailFast(tasks); - } catch (ExecutionException e) { - Throwables.propagateIfPossible(e.getCause(), exceptionClass); - Throwables.propagateIfPossible( - e.getCause(), TargetParsingException.class, InterruptedException.class); - throw new IllegalStateException(e); - } - if (!foundTarget.get()) { - throw new TargetParsingException("no targets found beneath '" + pathFragment + "'"); - } + return Futures.whenAllSucceed(futures).call(new Callable<Void>() { + @Override + public Void call() throws TargetParsingException { + if (!foundTarget.get()) { + throw new TargetParsingException("no targets found beneath '" + pathFragment + "'"); + } + return null; + } + }); } private static <T> int calculateSize(Iterable<ResolvedTargets<T>> resolvedTargets) { @@ -308,5 +313,6 @@ public class RecursivePackageProviderBackedTargetPatternResolver } return size; } + } diff --git a/src/main/java/com/google/devtools/build/lib/util/BatchCallback.java b/src/main/java/com/google/devtools/build/lib/util/BatchCallback.java index bc74b7a161..91b4be86ab 100644 --- a/src/main/java/com/google/devtools/build/lib/util/BatchCallback.java +++ b/src/main/java/com/google/devtools/build/lib/util/BatchCallback.java @@ -13,11 +13,14 @@ // limitations under the License. package com.google.devtools.build.lib.util; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; + /** * Callback to be invoked when part of a result has been computed. Allows a client interested in * the result to process it as it is computed, for instance by streaming it, if it is too big to * fit in memory. */ +@ThreadSafe public interface BatchCallback<T, E extends Exception> { /** * Called when part of a result has been computed. |