aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Nathan Harmata <nharmata@google.com>2017-02-27 21:55:56 +0000
committerGravatar Yue Gan <yueg@google.com>2017-02-28 11:33:08 +0000
commit822c37816ac669e51bec3853b41849a19ec5e230 (patch)
treea12e1f438342aa9ec1846089fc255bf2abb18ad3 /src
parentfb64609c3f1d3492f4d80807f5d91894fa147172 (diff)
Reimplement blaze query using an async evaluation model. Use a concurrent backend for SkyQueryEnvironment's implementation in order to achieve parallelism.
Advantages: -New design has no flaws that the old design had. -Code is structured so that deadlocks due to thread starvation are impossible (yup!). Disadvantages: -The meat of this change needs to all be in a single CL because every single QueryFunction and QueryExpression needs to be rewritten in the async style. Still TODO: -Fully embrace the async model in all QueryFunctions (e.g. 'rdeps', 'allpaths'). -Use concurrency in BlazeQueryEnvironment to achieve parallel evaluation for (non SkyQuery) 'blaze query' and genquery. -- PiperOrigin-RevId: 148690279 MOS_MIGRATED_REVID=148690279
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/google/devtools/build/lib/bazel/commands/FetchCommand.java4
-rw-r--r--src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java62
-rw-r--r--src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java52
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java71
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java48
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java46
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/QueryEnvironmentFactory.java5
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java37
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java344
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/AbstractQueryEnvironment.java194
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java73
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java45
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java202
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java24
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java5
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java19
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java19
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java20
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java25
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java21
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java2
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java188
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java207
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java48
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java67
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java108
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java41
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java50
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java12
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java58
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java82
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java12
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/SynchronizedDelegatingOutputFormatterCallback.java58
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java40
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java29
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java23
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeOutputFormatterCallback.java (renamed from src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeUniquifier.java)9
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java2
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java51
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/output/OutputFormatter.java29
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/output/ProtoOutputFormatter.java8
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/output/XmlOutputFormatter.java8
-rw-r--r--src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java4
-rw-r--r--src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java11
-rw-r--r--src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java28
-rw-r--r--src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java96
-rw-r--r--src/main/java/com/google/devtools/build/lib/util/BatchCallback.java3
47 files changed, 1316 insertions, 1274 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/bazel/commands/FetchCommand.java b/src/main/java/com/google/devtools/build/lib/bazel/commands/FetchCommand.java
index c0fce7a8c4..b4674481d0 100644
--- a/src/main/java/com/google/devtools/build/lib/bazel/commands/FetchCommand.java
+++ b/src/main/java/com/google/devtools/build/lib/bazel/commands/FetchCommand.java
@@ -21,10 +21,10 @@ import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.packages.Target;
import com.google.devtools.build.lib.pkgcache.PackageCacheOptions;
import com.google.devtools.build.lib.query2.AbstractBlazeQueryEnvironment;
-import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
import com.google.devtools.build.lib.runtime.BlazeCommand;
import com.google.devtools.build.lib.runtime.BlazeRuntime;
import com.google.devtools.build.lib.runtime.Command;
@@ -109,7 +109,7 @@ public final class FetchCommand implements BlazeCommand {
// 2. Evaluate expression:
try {
- queryEnv.evaluateQuery(expr, new OutputFormatterCallback<Target>() {
+ queryEnv.evaluateQuery(expr, new ThreadSafeOutputFormatterCallback<Target>() {
@Override
public void processOutput(Iterable<Target> partialResult) {
// Throw away the result.
diff --git a/src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java b/src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java
index e997b9f78d..88f891c1f3 100644
--- a/src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java
+++ b/src/main/java/com/google/devtools/build/lib/cmdline/TargetPattern.java
@@ -19,6 +19,9 @@ import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.devtools.build.lib.cmdline.LabelValidator.BadLabelException;
import com.google.devtools.build.lib.cmdline.LabelValidator.PackageAndTarget;
import com.google.devtools.build.lib.util.BatchCallback;
@@ -26,15 +29,12 @@ import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.util.StringUtilities;
import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
import com.google.devtools.build.lib.vfs.PathFragment;
-
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.ForkJoinPool;
import java.util.regex.Pattern;
-
import javax.annotation.concurrent.Immutable;
/**
@@ -157,17 +157,48 @@ public abstract class TargetPattern implements Serializable {
throws TargetParsingException, E, InterruptedException;
/**
- * Same as {@link #eval}, but optionally making use of the given {@link ForkJoinPool} to achieve
- * parallelism.
+ * Evaluates this {@link TargetPattern} synchronously, feeding the result to the given
+ * {@code callback}, and then returns an appropriate immediate {@link ListenableFuture}.
+ *
+ * <p>If the returned {@link ListenableFuture}'s {@link ListenableFuture#get} throws an
+ * {@link ExecutionException}, the cause will be an instance of either
+ * {@link TargetParsingException} or the given {@code exceptionClass}.
+ */
+ public final <T, E extends Exception> ListenableFuture<Void> evalAdaptedForAsync(
+ TargetPatternResolver<T> resolver,
+ ImmutableSet<PathFragment> excludedSubdirectories,
+ ThreadSafeBatchCallback<T, E> callback,
+ Class<E> exceptionClass) {
+ try {
+ eval(resolver, excludedSubdirectories, callback, exceptionClass);
+ return Futures.immediateFuture(null);
+ } catch (TargetParsingException e) {
+ return Futures.immediateFailedFuture(e);
+ } catch (InterruptedException e) {
+ return Futures.immediateCancelledFuture();
+ } catch (Exception e) {
+ if (exceptionClass.isInstance(e)) {
+ return Futures.immediateFailedFuture(exceptionClass.cast(e));
+ }
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * Returns a {@link ListenableFuture} representing the asynchronous evaluation of this
+ * {@link TargetPattern} that feeds the results to the given {@code callback}.
+ *
+ * <p>If the returned {@link ListenableFuture}'s {@link ListenableFuture#get} throws an
+ * {@link ExecutionException}, the cause will be an instance of either
+ * {@link TargetParsingException} or the given {@code exceptionClass}.
*/
- public <T, E extends Exception> void parEval(
+ public <T, E extends Exception> ListenableFuture<Void> evalAsync(
TargetPatternResolver<T> resolver,
ImmutableSet<PathFragment> excludedSubdirectories,
ThreadSafeBatchCallback<T, E> callback,
Class<E> exceptionClass,
- ForkJoinPool forkJoinPool)
- throws TargetParsingException, E, InterruptedException {
- eval(resolver, excludedSubdirectories, callback, exceptionClass);
+ ListeningExecutorService executor) {
+ return evalAdaptedForAsync(resolver, excludedSubdirectories, callback, exceptionClass);
}
/**
@@ -252,8 +283,8 @@ public abstract class TargetPattern implements Serializable {
public <T, E extends Exception> void eval(
TargetPatternResolver<T> resolver,
ImmutableSet<PathFragment> excludedSubdirectories,
- BatchCallback<T, E> callback, Class<E> exceptionClass)
- throws TargetParsingException, E, InterruptedException {
+ BatchCallback<T, E> callback,
+ Class<E> exceptionClass) throws TargetParsingException, E, InterruptedException {
Preconditions.checkArgument(excludedSubdirectories.isEmpty(),
"Target pattern \"%s\" of type %s cannot be evaluated with excluded subdirectories: %s.",
getOriginalPattern(), getType(), excludedSubdirectories);
@@ -518,14 +549,13 @@ public abstract class TargetPattern implements Serializable {
}
@Override
- public <T, E extends Exception> void parEval(
+ public <T, E extends Exception> ListenableFuture<Void> evalAsync(
TargetPatternResolver<T> resolver,
ImmutableSet<PathFragment> excludedSubdirectories,
ThreadSafeBatchCallback<T, E> callback,
Class<E> exceptionClass,
- ForkJoinPool forkJoinPool)
- throws TargetParsingException, E, InterruptedException {
- resolver.findTargetsBeneathDirectoryPar(
+ ListeningExecutorService executor) {
+ return resolver.findTargetsBeneathDirectoryAsync(
directory.getRepository(),
getOriginalPattern(),
directory.getPackageFragment().getPathString(),
@@ -533,7 +563,7 @@ public abstract class TargetPattern implements Serializable {
excludedSubdirectories,
callback,
exceptionClass,
- forkJoinPool);
+ executor);
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java
index b6b384c882..38b866b68e 100644
--- a/src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java
+++ b/src/main/java/com/google/devtools/build/lib/cmdline/TargetPatternResolver.java
@@ -15,35 +15,38 @@
package com.google.devtools.build.lib.cmdline;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.devtools.build.lib.util.BatchCallback;
import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
import com.google.devtools.build.lib.vfs.PathFragment;
import java.util.concurrent.ForkJoinPool;
/**
- * A callback interface that is used during the process of converting target patterns (such as
+ * A callback that is used during the process of converting target patterns (such as
* <code>//foo:all</code>) into one or more lists of targets (such as <code>//foo:foo,
* //foo:bar</code>). During a call to {@link TargetPattern#eval}, the {@link TargetPattern} makes
* calls to this interface to implement the target pattern semantics. The generic type {@code T} is
* only for compile-time type safety; there are no requirements to the actual type.
*/
-public interface TargetPatternResolver<T> {
+public abstract class TargetPatternResolver<T> {
/**
* Reports the given warning.
*/
- void warn(String msg);
+ public abstract void warn(String msg);
/**
* Returns a single target corresponding to the given label, or null. This method may only throw
* an exception if the current thread was interrupted.
*/
- T getTargetOrNull(Label label) throws InterruptedException;
+ public abstract T getTargetOrNull(Label label) throws InterruptedException;
/**
* Returns a single target corresponding to the given label, or an empty or failed result.
*/
- ResolvedTargets<T> getExplicitTarget(Label label)
+ public abstract ResolvedTargets<T> getExplicitTarget(Label label)
throws TargetParsingException, InterruptedException;
/**
@@ -55,7 +58,7 @@ public interface TargetPatternResolver<T> {
* @param packageIdentifier the identifier of the package
* @param rulesOnly whether to return rules only
*/
- ResolvedTargets<T> getTargetsInPackage(String originalPattern,
+ public abstract ResolvedTargets<T> getTargetsInPackage(String originalPattern,
PackageIdentifier packageIdentifier, boolean rulesOnly)
throws TargetParsingException, InterruptedException;
@@ -84,7 +87,7 @@ public interface TargetPatternResolver<T> {
* @param exceptionClass The class type of the parameterized exception.
* @throws TargetParsingException under implementation-specific failure conditions
*/
- <E extends Exception> void findTargetsBeneathDirectory(
+ public abstract <E extends Exception> void findTargetsBeneathDirectory(
RepositoryName repository,
String originalPattern,
String directory,
@@ -98,7 +101,7 @@ public interface TargetPatternResolver<T> {
* Same as {@link #findTargetsBeneathDirectory}, but optionally making use of the given
* {@link ForkJoinPool} to achieve parallelism.
*/
- <E extends Exception> void findTargetsBeneathDirectoryPar(
+ public <E extends Exception> ListenableFuture<Void> findTargetsBeneathDirectoryAsync(
RepositoryName repository,
String originalPattern,
String directory,
@@ -106,19 +109,38 @@ public interface TargetPatternResolver<T> {
ImmutableSet<PathFragment> excludedSubdirectories,
ThreadSafeBatchCallback<T, E> callback,
Class<E> exceptionClass,
- ForkJoinPool forkJoinPool)
- throws TargetParsingException, E, InterruptedException;
+ ListeningExecutorService executor) {
+ try {
+ findTargetsBeneathDirectory(
+ repository,
+ originalPattern,
+ directory,
+ rulesOnly,
+ excludedSubdirectories,
+ callback,
+ exceptionClass);
+ return Futures.immediateFuture(null);
+ } catch (TargetParsingException e) {
+ return Futures.immediateFailedFuture(e);
+ } catch (InterruptedException e) {
+ return Futures.immediateCancelledFuture();
+ } catch (Exception e) {
+ if (exceptionClass.isInstance(e)) {
+ return Futures.immediateFailedFuture(e);
+ }
+ throw new IllegalStateException(e);
+ }
+ }
/**
* Returns true, if and only if the given package identifier corresponds to a package, i.e., a
- * file with the name {@code packageName/BUILD} exists in the appropriat repository.
+ * file with the name {@code packageName/BUILD} exists in the appropriate repository.
*/
- boolean isPackage(PackageIdentifier packageIdentifier) throws InterruptedException;
+ public abstract boolean isPackage(PackageIdentifier packageIdentifier)
+ throws InterruptedException;
/**
* Returns the target kind of the given target, for example {@code cc_library rule}.
*/
- String getTargetKind(T target);
-
-
+ public abstract String getTargetKind(T target);
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java
index 644adcf93d..b0a18cccc9 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/AbstractBlazeQueryEnvironment.java
@@ -24,15 +24,15 @@ import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
import com.google.devtools.build.lib.packages.DependencyFilter;
import com.google.devtools.build.lib.packages.Target;
+import com.google.devtools.build.lib.query2.engine.AbstractQueryEnvironment;
import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment;
import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
-import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
import com.google.devtools.build.lib.query2.engine.QueryUtil;
import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
import com.google.devtools.build.lib.query2.engine.VariableContext;
import com.google.devtools.build.lib.util.Preconditions;
import java.io.IOException;
@@ -47,8 +47,7 @@ import java.util.logging.Logger;
* {@link QueryEnvironment} that can evaluate queries to produce a result, and implements as much of
* QueryEnvironment as possible while remaining mostly agnostic as to the objects being stored.
*/
-public abstract class AbstractBlazeQueryEnvironment<T>
- implements QueryEnvironment<T> {
+public abstract class AbstractBlazeQueryEnvironment<T> extends AbstractQueryEnvironment<T> {
protected ErrorSensingEventHandler eventHandler;
protected final boolean keepGoing;
protected final boolean strictScope;
@@ -58,7 +57,6 @@ public abstract class AbstractBlazeQueryEnvironment<T>
protected final Set<Setting> settings;
protected final List<QueryFunction> extraFunctions;
- private final QueryExpressionEvalListener<T> evalListener;
private static final Logger logger =
Logger.getLogger(AbstractBlazeQueryEnvironment.class.getName());
@@ -69,8 +67,7 @@ public abstract class AbstractBlazeQueryEnvironment<T>
Predicate<Label> labelFilter,
ExtendedEventHandler eventHandler,
Set<Setting> settings,
- Iterable<QueryFunction> extraFunctions,
- QueryExpressionEvalListener<T> evalListener) {
+ Iterable<QueryFunction> extraFunctions) {
this.eventHandler = new ErrorSensingEventHandler(eventHandler);
this.keepGoing = keepGoing;
this.strictScope = strictScope;
@@ -78,7 +75,6 @@ public abstract class AbstractBlazeQueryEnvironment<T>
this.labelFilter = labelFilter;
this.settings = Sets.immutableEnumSet(settings);
this.extraFunctions = ImmutableList.copyOf(extraFunctions);
- this.evalListener = evalListener;
}
private static DependencyFilter constructDependencyFilter(
@@ -103,7 +99,7 @@ public abstract class AbstractBlazeQueryEnvironment<T>
*/
protected void evalTopLevelInternal(QueryExpression expr, OutputFormatterCallback<T> callback)
throws QueryException, InterruptedException {
- eval(expr, VariableContext.<T>empty(), callback);
+ ((QueryTaskFutureImpl<Void>) eval(expr, VariableContext.<T>empty(), callback)).getChecked();
}
/**
@@ -120,9 +116,9 @@ public abstract class AbstractBlazeQueryEnvironment<T>
*/
public QueryEvalResult evaluateQuery(
QueryExpression expr,
- final OutputFormatterCallback<T> callback)
+ ThreadSafeOutputFormatterCallback<T> callback)
throws QueryException, InterruptedException, IOException {
- EmptinessSensingCallback<T> emptySensingCallback = createEmptinessSensingCallback(callback);
+ EmptinessSensingCallback<T> emptySensingCallback = new EmptinessSensingCallback<>(callback);
long startTime = System.currentTimeMillis();
// In the --nokeep_going case, errors are reported in the order in which the patterns are
// specified; using a linked hash set here makes sure that the left-most error is reported.
@@ -175,13 +171,6 @@ public abstract class AbstractBlazeQueryEnvironment<T>
return new QueryEvalResult(!eventHandler.hasErrors(), emptySensingCallback.isEmpty());
}
- private static <T> EmptinessSensingCallback<T> createEmptinessSensingCallback(
- OutputFormatterCallback<T> callback) {
- return (callback instanceof ThreadSafeCallback)
- ? new ThreadSafeEmptinessSensingCallback<>(callback)
- : new EmptinessSensingCallback<>(callback);
- }
-
private static class EmptinessSensingCallback<T> extends OutputFormatterCallback<T> {
private final OutputFormatterCallback<T> callback;
private final AtomicBoolean empty = new AtomicBoolean(true);
@@ -212,19 +201,11 @@ public abstract class AbstractBlazeQueryEnvironment<T>
}
}
- private static class ThreadSafeEmptinessSensingCallback<T>
- extends EmptinessSensingCallback<T> implements ThreadSafeCallback<T> {
- private ThreadSafeEmptinessSensingCallback(OutputFormatterCallback<T> callback) {
- super(callback);
- Preconditions.checkState(callback instanceof ThreadSafeCallback);
- }
- }
-
public QueryExpression transformParsedQuery(QueryExpression queryExpression) {
return queryExpression;
}
- public QueryEvalResult evaluateQuery(String query, OutputFormatterCallback<T> callback)
+ public QueryEvalResult evaluateQuery(String query, ThreadSafeOutputFormatterCallback<T> callback)
throws QueryException, InterruptedException, IOException {
return evaluateQuery(
QueryExpression.parse(query, this), callback);
@@ -256,17 +237,32 @@ public abstract class AbstractBlazeQueryEnvironment<T>
return true;
}
- public Set<T> evalTargetPattern(QueryExpression caller, String pattern)
- throws QueryException, InterruptedException {
+ public QueryTaskFuture<Set<T>> evalTargetPattern(QueryExpression caller, String pattern) {
try {
preloadOrThrow(caller, ImmutableList.of(pattern));
- } catch (TargetParsingException e) {
- // Will skip the target and keep going if -k is specified.
- reportBuildFileError(caller, e.getMessage());
+ } catch (TargetParsingException tpe) {
+ try {
+ // Will skip the target and keep going if -k is specified.
+ reportBuildFileError(caller, tpe.getMessage());
+ } catch (QueryException qe) {
+ return immediateFailedFuture(qe);
+ }
+ } catch (QueryException qe) {
+ return immediateFailedFuture(qe);
+ } catch (InterruptedException e) {
+ return immediateCancelledFuture();
}
- AggregateAllCallback<T> aggregatingCallback = QueryUtil.newAggregateAllCallback();
- getTargetsMatchingPattern(caller, pattern, aggregatingCallback);
- return aggregatingCallback.getResult();
+ final AggregateAllCallback<T> aggregatingCallback = QueryUtil.newAggregateAllCallback();
+ QueryTaskFuture<Void> evalFuture =
+ getTargetsMatchingPattern(caller, pattern, aggregatingCallback);
+ return whenSucceedsCall(
+ evalFuture,
+ new QueryTaskCallable<Set<T>>() {
+ @Override
+ public Set<T> call() {
+ return aggregatingCallback.getResult();
+ }
+ });
}
/**
@@ -289,9 +285,4 @@ public abstract class AbstractBlazeQueryEnvironment<T>
builder.addAll(extraFunctions);
return builder.build();
}
-
- @Override
- public QueryExpressionEvalListener<T> getEvalListener() {
- return evalListener;
- }
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java
index 867ec3d8bf..de151aba0b 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/BlazeQueryEnvironment.java
@@ -37,18 +37,13 @@ import com.google.devtools.build.lib.pkgcache.TargetProvider;
import com.google.devtools.build.lib.pkgcache.TransitivePackageLoader;
import com.google.devtools.build.lib.query2.engine.Callback;
import com.google.devtools.build.lib.query2.engine.DigraphQueryEvalResult;
-import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
-import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
-import com.google.devtools.build.lib.query2.engine.QueryUtil;
import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractUniquifier;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback;
import com.google.devtools.build.lib.query2.engine.SkyframeRestartQueryException;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
import com.google.devtools.build.lib.query2.engine.Uniquifier;
-import com.google.devtools.build.lib.query2.engine.VariableContext;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.PathFragment;
import java.io.IOException;
@@ -61,7 +56,6 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
/**
* The environment of a Blaze query. Not thread-safe.
@@ -108,10 +102,8 @@ public class BlazeQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
Predicate<Label> labelFilter,
ExtendedEventHandler eventHandler,
Set<Setting> settings,
- Iterable<QueryFunction> extraFunctions,
- QueryExpressionEvalListener<Target> evalListener) {
- super(
- keepGoing, strictScope, labelFilter, eventHandler, settings, extraFunctions, evalListener);
+ Iterable<QueryFunction> extraFunctions) {
+ super(keepGoing, strictScope, labelFilter, eventHandler, settings, extraFunctions);
this.targetPatternEvaluator = targetPatternEvaluator;
this.transitivePackageLoader = transitivePackageLoader;
this.targetProvider = targetProvider;
@@ -123,7 +115,7 @@ public class BlazeQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
@Override
public DigraphQueryEvalResult<Target> evaluateQuery(
QueryExpression expr,
- final OutputFormatterCallback<Target> callback)
+ ThreadSafeOutputFormatterCallback<Target> callback)
throws QueryException, InterruptedException, IOException {
eventHandler.resetErrors();
resolvedTargetPatterns.clear();
@@ -133,8 +125,19 @@ public class BlazeQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
}
@Override
- public void getTargetsMatchingPattern(
- QueryExpression caller, String pattern, Callback<Target> callback)
+ public QueryTaskFuture<Void> getTargetsMatchingPattern(
+ QueryExpression owner, String pattern, Callback<Target> callback) {
+ try {
+ getTargetsMatchingPatternImpl(pattern, callback);
+ return immediateSuccessfulFuture(null);
+ } catch (QueryException e) {
+ return immediateFailedFuture(e);
+ } catch (InterruptedException e) {
+ return immediateCancelledFuture();
+ }
+ }
+
+ private void getTargetsMatchingPatternImpl(String pattern, Callback<Target> callback)
throws QueryException, InterruptedException {
// We can safely ignore the boolean error flag. The evaluateQuery() method above wraps the
// entire query computation in an error sensor.
@@ -192,15 +195,6 @@ public class BlazeQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
}
@Override
- public void getTargetsMatchingPatternPar(
- QueryExpression caller,
- String pattern,
- ThreadSafeCallback<Target> callback,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
- getTargetsMatchingPattern(caller, pattern, callback);
- }
-
- @Override
public Target getTarget(Label label)
throws TargetNotFoundException, QueryException, InterruptedException {
// Can't use strictScope here because we are expecting a target back.
@@ -293,14 +287,6 @@ public class BlazeQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
}
@Override
- public void eval(QueryExpression expr, VariableContext<Target> context, Callback<Target> callback)
- throws QueryException, InterruptedException {
- AggregateAllCallback<Target> aggregator = QueryUtil.newAggregateAllCallback();
- expr.eval(this, context, aggregator);
- callback.process(aggregator.getResult());
- }
-
- @Override
public Uniquifier<Target> createUniquifier() {
return new AbstractUniquifier<Target, Label>() {
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index 30a27f8e63..c3e632ebb4 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -36,10 +36,10 @@ import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.packages.Target;
import com.google.devtools.build.lib.query2.engine.Callback;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeUniquifier;
+import com.google.devtools.build.lib.query2.engine.Uniquifier;
import com.google.devtools.build.lib.query2.engine.VariableContext;
import com.google.devtools.build.lib.skyframe.PackageValue;
import com.google.devtools.build.lib.skyframe.SkyFunctions;
@@ -77,14 +77,13 @@ class ParallelSkyQueryUtils {
* Specialized parallel variant of {@link SkyQueryEnvironment#getAllRdeps} that is appropriate
* when there is no depth-bound.
*/
- static void getAllRdepsUnboundedParallel(
+ static QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
SkyQueryEnvironment env,
QueryExpression expression,
VariableContext<Target> context,
- ThreadSafeCallback<Target> callback,
- MultisetSemaphore<PackageIdentifier> packageSemaphore)
- throws QueryException, InterruptedException {
- env.eval(
+ Callback<Target> callback,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore) {
+ return env.eval(
expression,
context,
new SkyKeyBFSVisitorCallback(
@@ -95,10 +94,10 @@ class ParallelSkyQueryUtils {
static void getRBuildFilesParallel(
SkyQueryEnvironment env,
Collection<PathFragment> fileIdentifiers,
- ThreadSafeCallback<Target> callback,
+ Callback<Target> callback,
MultisetSemaphore<PackageIdentifier> packageSemaphore)
throws QueryException, InterruptedException {
- ThreadSafeUniquifier<SkyKey> keyUniquifier = env.createSkyKeyUniquifier();
+ Uniquifier<SkyKey> keyUniquifier = env.createSkyKeyUniquifier();
RBuildFilesVisitor visitor =
new RBuildFilesVisitor(env, keyUniquifier, callback, packageSemaphore);
visitor.visitAndWaitForCompletion(env.getSkyKeysForFileFragments(fileIdentifiers));
@@ -110,7 +109,7 @@ class ParallelSkyQueryUtils {
private RBuildFilesVisitor(
SkyQueryEnvironment env,
- ThreadSafeUniquifier<SkyKey> uniquifier,
+ Uniquifier<SkyKey> uniquifier,
Callback<Target> callback,
MultisetSemaphore<PackageIdentifier> packageSemaphore) {
super(env, uniquifier, callback);
@@ -180,8 +179,8 @@ class ParallelSkyQueryUtils {
private AllRdepsUnboundedVisitor(
SkyQueryEnvironment env,
- ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier,
- ThreadSafeCallback<Target> callback,
+ Uniquifier<Pair<SkyKey, SkyKey>> uniquifier,
+ Callback<Target> callback,
MultisetSemaphore<PackageIdentifier> packageSemaphore) {
super(env, uniquifier, callback);
this.packageSemaphore = packageSemaphore;
@@ -190,19 +189,18 @@ class ParallelSkyQueryUtils {
/**
* A {@link Factory} for {@link AllRdepsUnboundedVisitor} instances, each of which will be used
* to perform visitation of the reverse transitive closure of the {@link Target}s passed in a
- * single {@link ThreadSafeCallback#process} call. Note that all the created
- * instances share the same {@code ThreadSafeUniquifier<SkyKey>} so that we don't visit the
- * same Skyframe node more than once.
+ * single {@link Callback#process} call. Note that all the created instances share the same
+ * {@link Uniquifier} so that we don't visit the same Skyframe node more than once.
*/
private static class Factory implements AbstractSkyKeyBFSVisitor.Factory {
private final SkyQueryEnvironment env;
- private final ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier;
- private final ThreadSafeCallback<Target> callback;
+ private final Uniquifier<Pair<SkyKey, SkyKey>> uniquifier;
+ private final Callback<Target> callback;
private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
private Factory(
SkyQueryEnvironment env,
- ThreadSafeCallback<Target> callback,
+ Callback<Target> callback,
MultisetSemaphore<PackageIdentifier> packageSemaphore) {
this.env = env;
this.uniquifier = env.createReverseDepSkyKeyUniquifier();
@@ -341,10 +339,10 @@ class ParallelSkyQueryUtils {
}
/**
- * A {@link ThreadSafeCallback} whose {@link ThreadSafeCallback#process} method kicks off a BFS
- * visitation via a fresh {@link AbstractSkyKeyBFSVisitor} instance.
+ * A {@link Callback} whose {@link Callback#process} method kicks off a BFS visitation via a fresh
+ * {@link AbstractSkyKeyBFSVisitor} instance.
*/
- private static class SkyKeyBFSVisitorCallback implements ThreadSafeCallback<Target> {
+ private static class SkyKeyBFSVisitorCallback implements Callback<Target> {
private final AbstractSkyKeyBFSVisitor.Factory visitorFactory;
private SkyKeyBFSVisitorCallback(AbstractSkyKeyBFSVisitor.Factory visitorFactory) {
@@ -355,6 +353,8 @@ class ParallelSkyQueryUtils {
public void process(Iterable<Target> partialResult)
throws QueryException, InterruptedException {
AbstractSkyKeyBFSVisitor<?> visitor = visitorFactory.create();
+ // TODO(nharmata): It's not ideal to have an operation like this in #process that blocks on
+ // another, potentially expensive computation. Refactor to something like "processAsync".
visitor.visitAndWaitForCompletion(
SkyQueryEnvironment.makeTransitiveTraversalKeysStrict(partialResult));
}
@@ -370,7 +370,7 @@ class ParallelSkyQueryUtils {
@ThreadSafe
private abstract static class AbstractSkyKeyBFSVisitor<T> {
protected final SkyQueryEnvironment env;
- private final ThreadSafeUniquifier<T> uniquifier;
+ private final Uniquifier<T> uniquifier;
private final Callback<Target> callback;
private final QuiescingExecutor executor;
@@ -434,7 +434,7 @@ class ParallelSkyQueryUtils {
new ThreadFactoryBuilder().setNameFormat("skykey-bfs-visitor %d").build());
private AbstractSkyKeyBFSVisitor(
- SkyQueryEnvironment env, ThreadSafeUniquifier<T> uniquifier, Callback<Target> callback) {
+ SkyQueryEnvironment env, Uniquifier<T> uniquifier, Callback<Target> callback) {
this.env = env;
this.uniquifier = uniquifier;
this.callback = callback;
diff --git a/src/main/java/com/google/devtools/build/lib/query2/QueryEnvironmentFactory.java b/src/main/java/com/google/devtools/build/lib/query2/QueryEnvironmentFactory.java
index f8c8327627..d43f71beb2 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/QueryEnvironmentFactory.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/QueryEnvironmentFactory.java
@@ -24,7 +24,6 @@ import com.google.devtools.build.lib.pkgcache.TargetProvider;
import com.google.devtools.build.lib.pkgcache.TransitivePackageLoader;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting;
-import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.skyframe.WalkableGraph.WalkableGraphFactory;
import java.util.List;
@@ -48,7 +47,6 @@ public class QueryEnvironmentFactory {
ExtendedEventHandler eventHandler,
Set<Setting> settings,
Iterable<QueryFunction> functions,
- QueryExpressionEvalListener<Target> evalListener,
@Nullable PathPackageLocator packagePath) {
Preconditions.checkNotNull(universeScope);
if (canUseSkyQuery(orderedResults, universeScope, packagePath, strictScope, labelFilter)) {
@@ -58,7 +56,6 @@ public class QueryEnvironmentFactory {
eventHandler,
settings,
functions,
- evalListener,
targetPatternEvaluator.getOffset(),
graphFactory,
universeScope,
@@ -66,7 +63,7 @@ public class QueryEnvironmentFactory {
} else {
return new BlazeQueryEnvironment(transitivePackageLoader, targetProvider,
targetPatternEvaluator, keepGoing, strictScope, loadingPhaseThreads, labelFilter,
- eventHandler, settings, functions, evalListener);
+ eventHandler, settings, functions);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java
index 79614c24a5..e2fb0e4dfd 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java
@@ -22,14 +22,12 @@ import com.google.devtools.build.lib.query2.engine.QueryEnvironment;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
import com.google.devtools.build.lib.query2.engine.VariableContext;
import com.google.devtools.build.lib.vfs.PathFragment;
-
import java.util.List;
-import java.util.concurrent.ForkJoinPool;
/**
* An "rbuildfiles" query expression, which computes the set of packages (as represented by their
@@ -69,36 +67,19 @@ public class RBuildFilesFunction implements QueryFunction {
@Override
@SuppressWarnings("unchecked") // Cast from <Target> to <T>. This will only be used with <Target>.
- public <T> void eval(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- QueryExpression expression,
- List<Argument> args,
- Callback<T> callback) throws QueryException, InterruptedException {
- if (!(env instanceof SkyQueryEnvironment)) {
- throw new QueryException("rbuildfiles can only be used with SkyQueryEnvironment");
- }
- ((SkyQueryEnvironment) env)
- .getRBuildFiles(
- Collections2.transform(args, ARGUMENT_TO_PATH_FRAGMENT), (Callback<Target>) callback);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <T> void parEval(
+ public <T> QueryTaskFuture<Void> eval(
QueryEnvironment<T> env,
VariableContext<T> context,
QueryExpression expression,
List<Argument> args,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+ Callback<T> callback) {
if (!(env instanceof SkyQueryEnvironment)) {
- throw new QueryException("rbuildfiles can only be used with SkyQueryEnvironment");
+ return env.immediateFailedFuture(
+ new QueryException("rbuildfiles can only be used with SkyQueryEnvironment"));
}
- ((SkyQueryEnvironment) env)
- .getRBuildFilesParallel(
- Collections2.transform(args, ARGUMENT_TO_PATH_FRAGMENT),
- (ThreadSafeCallback<Target>) callback,
- forkJoinPool);
+ SkyQueryEnvironment skyEnv = ((SkyQueryEnvironment) env);
+ return skyEnv.getRBuildFilesParallel(
+ Collections2.transform(args, ARGUMENT_TO_PATH_FRAGMENT),
+ (Callback<Target>) callback);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 3e1410f582..565b6ae519 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -29,14 +29,20 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.cmdline.LabelSyntaxException;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.cmdline.TargetParsingException;
import com.google.devtools.build.lib.cmdline.TargetPattern;
import com.google.devtools.build.lib.collect.CompactHashSet;
+import com.google.devtools.build.lib.concurrent.BlockingStack;
import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
-import com.google.devtools.build.lib.concurrent.NamedForkJoinPool;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.ExtendedEventHandler;
@@ -56,17 +62,17 @@ import com.google.devtools.build.lib.query2.engine.AllRdepsFunction;
import com.google.devtools.build.lib.query2.engine.Callback;
import com.google.devtools.build.lib.query2.engine.FunctionExpression;
import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
-import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractThreadSafeUniquifier;
+import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractUniquifier;
import com.google.devtools.build.lib.query2.engine.RdepsFunction;
import com.google.devtools.build.lib.query2.engine.StreamableQueryEnvironment;
import com.google.devtools.build.lib.query2.engine.TargetLiteral;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeCallback;
-import com.google.devtools.build.lib.query2.engine.ThreadSafeUniquifier;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
import com.google.devtools.build.lib.query2.engine.Uniquifier;
import com.google.devtools.build.lib.query2.engine.VariableContext;
import com.google.devtools.build.lib.skyframe.BlacklistedPackagePrefixesValue;
@@ -106,7 +112,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -117,6 +125,10 @@ import javax.annotation.Nullable;
* reverse edges. Results obtained by calling {@link #evaluateQuery} are not guaranteed to be in any
* particular order. As well, this class eagerly loads the full transitive closure of targets, even
* if the full closure isn't needed.
+ *
+ * <p>This class has concurrent implementations of the
+ * {@link QueryTaskFuture}/{@link QueryTaskCallable} helper methods. The combination of this and the
+ * asynchronous evaluation model yields parallel query evaluation.
*/
public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
implements StreamableQueryEnvironment<Target> {
@@ -140,7 +152,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
protected WalkableGraph graph;
private InterruptibleSupplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier;
private GraphBackedRecursivePackageProvider graphBackedRecursivePackageProvider;
- private ForkJoinPool forkJoinPool;
+ private ListeningExecutorService executor;
private RecursivePackageProviderBackedTargetPatternResolver resolver;
private final SkyKey universeKey;
private final ImmutableList<TargetPatternKey> universeTargetPatternKeys;
@@ -151,7 +163,6 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
ExtendedEventHandler eventHandler,
Set<Setting> settings,
Iterable<QueryFunction> extraFunctions,
- QueryExpressionEvalListener<Target> evalListener,
String parserPrefix,
WalkableGraphFactory graphFactory,
List<String> universeScope,
@@ -165,7 +176,6 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
eventHandler,
settings,
extraFunctions,
- evalListener,
parserPrefix,
graphFactory,
universeScope,
@@ -179,7 +189,6 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
ExtendedEventHandler eventHandler,
Set<Setting> settings,
Iterable<QueryFunction> extraFunctions,
- QueryExpressionEvalListener<Target> evalListener,
String parserPrefix,
WalkableGraphFactory graphFactory,
List<String> universeScope,
@@ -190,8 +199,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
/*labelFilter=*/ Rule.ALL_LABELS,
eventHandler,
settings,
- extraFunctions,
- evalListener);
+ extraFunctions);
this.loadingPhaseThreads = loadingPhaseThreads;
this.graphFactory = graphFactory;
this.pkgPath = pkgPath;
@@ -224,9 +232,15 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
graphBackedRecursivePackageProvider =
new GraphBackedRecursivePackageProvider(graph, universeTargetPatternKeys, pkgPath);
}
- if (forkJoinPool == null) {
- forkJoinPool =
- NamedForkJoinPool.newNamedPool("QueryEnvironment", queryEvaluationParallelismLevel);
+ if (executor == null) {
+ executor = MoreExecutors.listeningDecorator(
+ new ThreadPoolExecutor(
+ /*corePoolSize=*/ queryEvaluationParallelismLevel,
+ /*maximumPoolSize=*/ queryEvaluationParallelismLevel,
+ /*keepAliveTime=*/ 1,
+ /*units=*/ TimeUnit.SECONDS,
+ /*workQueue=*/ new BlockingStack<Runnable>(),
+ new ThreadFactoryBuilder().setNameFormat("QueryEnvironment %d").build()));
}
resolver =
new RecursivePackageProviderBackedTargetPatternResolver(
@@ -336,16 +350,17 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
} catch (Throwable throwable) {
throwableToThrow = throwable;
} finally {
- if (throwableToThrow != null) {
- LOG.log(Level.INFO, "About to shutdown FJP because of throwable", throwableToThrow);
+ if (throwableToThrow != null) {
+ LOG.log(
+ Level.INFO,
+ "About to shutdown query threadpool because of throwable",
+ throwableToThrow);
// Force termination of remaining tasks if evaluation failed abruptly (e.g. was
// interrupted). We don't want to leave any dangling threads running tasks.
- forkJoinPool.shutdownNow();
- }
- forkJoinPool.awaitQuiescence(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- if (throwableToThrow != null) {
- // Signal that pool must be recreated on the next invocation.
- forkJoinPool = null;
+ executor.shutdownNow();
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ // Signal that executor must be recreated on the next invocation.
+ executor = null;
Throwables.propagateIfPossible(
throwableToThrow, QueryException.class, InterruptedException.class);
}
@@ -354,7 +369,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
@Override
public QueryEvalResult evaluateQuery(
- QueryExpression expr, OutputFormatterCallback<Target> callback)
+ QueryExpression expr, ThreadSafeOutputFormatterCallback<Target> callback)
throws QueryException, InterruptedException, IOException {
// Some errors are reported as QueryExceptions and others as ERROR events (if --keep_going). The
// result is set to have an error iff there were errors emitted during the query, so we reset
@@ -565,38 +580,79 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
return null;
}
+ private <R> ListenableFuture<R> safeSubmit(Callable<R> callable) {
+ try {
+ return executor.submit(callable);
+ } catch (RejectedExecutionException e) {
+ return Futures.immediateCancelledFuture();
+ }
+ }
+
@ThreadSafe
@Override
- public void eval(QueryExpression expr, VariableContext<Target> context, Callback<Target> callback)
- throws QueryException, InterruptedException {
- // TODO(bazel-team): Refactor QueryEnvironment et al. such that this optimization is enabled for
- // all QueryEnvironment implementations.
- if (callback instanceof ThreadSafeCallback) {
- expr.parEval(this, context, (ThreadSafeCallback<Target>) callback, forkJoinPool);
- } else {
- expr.eval(this, context, callback);
- }
+ public QueryTaskFuture<Void> eval(
+ final QueryExpression expr,
+ final VariableContext<Target> context,
+ final Callback<Target> callback) {
+ // TODO(bazel-team): As in here, use concurrency for the async #eval of other QueryEnvironment
+ // implementations.
+ Callable<QueryTaskFutureImpl<Void>> task = new Callable<QueryTaskFutureImpl<Void>>() {
+ @Override
+ public QueryTaskFutureImpl<Void> call() {
+ return (QueryTaskFutureImpl<Void>) expr.eval(SkyQueryEnvironment.this, context, callback);
+ }
+ };
+ ListenableFuture<QueryTaskFutureImpl<Void>> futureFuture = safeSubmit(task);
+ return QueryTaskFutureImpl.ofDelegate(Futures.dereference(futureFuture));
+ }
+
+ @Override
+ public <R> QueryTaskFuture<R> executeAsync(QueryTaskCallable<R> callable) {
+ return QueryTaskFutureImpl.ofDelegate(safeSubmit(callable));
+ }
+
+ @Override
+ public <T1, T2> QueryTaskFuture<T2> transformAsync(
+ QueryTaskFuture<T1> future,
+ final Function<T1, QueryTaskFuture<T2>> function) {
+ return QueryTaskFutureImpl.ofDelegate(
+ Futures.transformAsync(
+ (QueryTaskFutureImpl<T1>) future,
+ new AsyncFunction<T1, T2>() {
+ @Override
+ public ListenableFuture<T2> apply(T1 input) {
+ return (QueryTaskFutureImpl<T2>) function.apply(input);
+ }
+ },
+ executor));
+ }
+
+ @Override
+ public <R> QueryTaskFuture<R> whenAllSucceedCall(
+ Iterable<? extends QueryTaskFuture<?>> futures, QueryTaskCallable<R> callable) {
+ return QueryTaskFutureImpl.ofDelegate(
+ Futures.whenAllSucceed(cast(futures)).call(callable, executor));
}
@ThreadSafe
@Override
- public ThreadSafeUniquifier<Target> createUniquifier() {
+ public Uniquifier<Target> createUniquifier() {
return createTargetUniquifier();
}
@ThreadSafe
- ThreadSafeUniquifier<Target> createTargetUniquifier() {
- return new ThreadSafeTargetUniquifier(DEFAULT_THREAD_COUNT);
+ Uniquifier<Target> createTargetUniquifier() {
+ return new TargetUniquifier(DEFAULT_THREAD_COUNT);
}
@ThreadSafe
- ThreadSafeUniquifier<SkyKey> createSkyKeyUniquifier() {
- return new ThreadSafeSkyKeyUniquifier(DEFAULT_THREAD_COUNT);
+ Uniquifier<SkyKey> createSkyKeyUniquifier() {
+ return new SkyKeyUniquifier(DEFAULT_THREAD_COUNT);
}
@ThreadSafe
- ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> createReverseDepSkyKeyUniquifier() {
- return new ThreadSafeReverseDepSkyKeyUniquifier(DEFAULT_THREAD_COUNT);
+ Uniquifier<Pair<SkyKey, SkyKey>> createReverseDepSkyKeyUniquifier() {
+ return new ReverseDepSkyKeyUniquifier(DEFAULT_THREAD_COUNT);
}
private Pair<TargetPattern, ImmutableSet<PathFragment>> getPatternAndExcludes(String pattern)
@@ -613,41 +669,44 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
@ThreadSafe
@Override
- public void getTargetsMatchingPattern(
- QueryExpression owner, String pattern, Callback<Target> callback)
- throws QueryException, InterruptedException {
+ public QueryTaskFuture<Void> getTargetsMatchingPattern(
+ final QueryExpression owner, String pattern, Callback<Target> callback) {
// Directly evaluate the target pattern, making use of packages in the graph.
+ Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude;
try {
- Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude =
- getPatternAndExcludes(pattern);
- TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst();
- ImmutableSet<PathFragment> subdirectoriesToExclude =
- patternToEvalAndSubdirectoriesToExclude.getSecond();
- patternToEval.eval(resolver, subdirectoriesToExclude, callback, QueryException.class);
- } catch (TargetParsingException e) {
- reportBuildFileError(owner, e.getMessage());
- }
- }
-
- @Override
- public void getTargetsMatchingPatternPar(
- QueryExpression owner,
- String pattern,
- ThreadSafeCallback<Target> callback,
- ForkJoinPool forkJoinPool)
- throws QueryException, InterruptedException {
- // Directly evaluate the target pattern, making use of packages in the graph.
- try {
- Pair<TargetPattern, ImmutableSet<PathFragment>> patternToEvalAndSubdirectoriesToExclude =
- getPatternAndExcludes(pattern);
- TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst();
- ImmutableSet<PathFragment> subdirectoriesToExclude =
- patternToEvalAndSubdirectoriesToExclude.getSecond();
- patternToEval.parEval(
- resolver, subdirectoriesToExclude, callback, QueryException.class, forkJoinPool);
- } catch (TargetParsingException e) {
- reportBuildFileError(owner, e.getMessage());
+ patternToEvalAndSubdirectoriesToExclude = getPatternAndExcludes(pattern);
+ } catch (TargetParsingException tpe) {
+ try {
+ reportBuildFileError(owner, tpe.getMessage());
+ } catch (QueryException qe) {
+ return immediateFailedFuture(qe);
+ }
+ return immediateSuccessfulFuture(null);
+ } catch (InterruptedException ie) {
+ return immediateCancelledFuture();
}
+ TargetPattern patternToEval = patternToEvalAndSubdirectoriesToExclude.getFirst();
+ ImmutableSet<PathFragment> subdirectoriesToExclude =
+ patternToEvalAndSubdirectoriesToExclude.getSecond();
+ AsyncFunction<TargetParsingException, Void> reportBuildFileErrorAsyncFunction =
+ new AsyncFunction<TargetParsingException, Void>() {
+ @Override
+ public ListenableFuture<Void> apply(TargetParsingException exn) throws QueryException {
+ reportBuildFileError(owner, exn.getMessage());
+ return Futures.immediateFuture(null);
+ }
+ };
+ ListenableFuture<Void> evalFuture = patternToEval.evalAsync(
+ resolver,
+ subdirectoriesToExclude,
+ callback,
+ QueryException.class,
+ executor);
+ return QueryTaskFutureImpl.ofDelegate(
+ Futures.catchingAsync(
+ evalFuture,
+ TargetParsingException.class,
+ reportBuildFileErrorAsyncFunction));
}
@ThreadSafe
@@ -1018,12 +1077,17 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
}
@ThreadSafe
- void getRBuildFilesParallel(
- Collection<PathFragment> fileIdentifiers,
- ThreadSafeCallback<Target> callback,
- ForkJoinPool forkJoinPool)
- throws QueryException, InterruptedException {
- ParallelSkyQueryUtils.getRBuildFilesParallel(this, fileIdentifiers, callback, packageSemaphore);
+ QueryTaskFuture<Void> getRBuildFilesParallel(
+ final Collection<PathFragment> fileIdentifiers,
+ final Callback<Target> callback) {
+ return QueryTaskFutureImpl.ofDelegate(safeSubmit(new Callable<Void>() {
+ @Override
+ public Void call() throws QueryException, InterruptedException {
+ ParallelSkyQueryUtils.getRBuildFilesParallel(
+ SkyQueryEnvironment.this, fileIdentifiers, callback, packageSemaphore);
+ return null;
+ }
+ }));
}
/**
@@ -1031,41 +1095,50 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
* on the given list of BUILD files and subincludes (other files are filtered out).
*/
@ThreadSafe
- void getRBuildFiles(Collection<PathFragment> fileIdentifiers, Callback<Target> callback)
- throws QueryException, InterruptedException {
- Collection<SkyKey> files = getSkyKeysForFileFragments(fileIdentifiers);
- Uniquifier<SkyKey> keyUniquifier = new ThreadSafeSkyKeyUniquifier(/*concurrencyLevel=*/ 1);
- Collection<SkyKey> current = keyUniquifier.unique(graph.getSuccessfulValues(files).keySet());
- Set<SkyKey> resultKeys = CompactHashSet.create();
- while (!current.isEmpty()) {
- Collection<Iterable<SkyKey>> reverseDeps = graph.getReverseDeps(current).values();
- current = new HashSet<>();
- for (SkyKey rdep : Iterables.concat(reverseDeps)) {
- if (rdep.functionName().equals(SkyFunctions.PACKAGE)) {
- resultKeys.add(rdep);
- // Every package has a dep on the external package, so we need to include those edges too.
- if (rdep.equals(PackageValue.key(Label.EXTERNAL_PACKAGE_IDENTIFIER))) {
+ QueryTaskFuture<Void> getRBuildFiles(
+ Collection<PathFragment> fileIdentifiers, Callback<Target> callback) {
+ try {
+ Collection<SkyKey> files = getSkyKeysForFileFragments(fileIdentifiers);
+ Uniquifier<SkyKey> keyUniquifier = new SkyKeyUniquifier(/*concurrencyLevel=*/ 1);
+ Collection<SkyKey> current = keyUniquifier.unique(graph.getSuccessfulValues(files).keySet());
+ Set<SkyKey> resultKeys = CompactHashSet.create();
+ while (!current.isEmpty()) {
+ Collection<Iterable<SkyKey>> reverseDeps = graph.getReverseDeps(current).values();
+ current = new HashSet<>();
+ for (SkyKey rdep : Iterables.concat(reverseDeps)) {
+ if (rdep.functionName().equals(SkyFunctions.PACKAGE)) {
+ resultKeys.add(rdep);
+ // Every package has a dep on the external package, so we need to include those edges
+ // too.
+ if (rdep.equals(PackageValue.key(Label.EXTERNAL_PACKAGE_IDENTIFIER))) {
+ if (keyUniquifier.unique(rdep)) {
+ current.add(rdep);
+ }
+ }
+ } else if (!rdep.functionName().equals(SkyFunctions.PACKAGE_LOOKUP)) {
+ // Packages may depend on the existence of subpackages, but these edges aren't relevant
+ // to rbuildfiles.
if (keyUniquifier.unique(rdep)) {
current.add(rdep);
}
}
- } else if (!rdep.functionName().equals(SkyFunctions.PACKAGE_LOOKUP)) {
- // Packages may depend on the existence of subpackages, but these edges aren't relevant to
- // rbuildfiles.
- if (keyUniquifier.unique(rdep)) {
- current.add(rdep);
- }
}
- }
- if (resultKeys.size() >= BATCH_CALLBACK_SIZE) {
- for (Iterable<SkyKey> batch : Iterables.partition(resultKeys, BATCH_CALLBACK_SIZE)) {
- callback.process(
- getBuildFilesForPackageValues(graph.getSuccessfulValues(batch).values()));
+ if (resultKeys.size() >= BATCH_CALLBACK_SIZE) {
+ for (Iterable<SkyKey> batch : Iterables.partition(resultKeys, BATCH_CALLBACK_SIZE)) {
+ callback.process(
+ getBuildFilesForPackageValues(graph.getSuccessfulValues(batch).values()));
+ }
+ resultKeys.clear();
}
- resultKeys.clear();
}
+ callback.process(
+ getBuildFilesForPackageValues(graph.getSuccessfulValues(resultKeys).values()));
+ return immediateSuccessfulFuture(null);
+ } catch (QueryException e) {
+ return immediateFailedFuture(e);
+ } catch (InterruptedException e) {
+ return immediateCancelledFuture();
}
- callback.process(getBuildFilesForPackageValues(graph.getSuccessfulValues(resultKeys).values()));
}
@Override
@@ -1093,9 +1166,8 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
}
}
- private static class ThreadSafeTargetUniquifier
- extends AbstractThreadSafeUniquifier<Target, Label> {
- protected ThreadSafeTargetUniquifier(int concurrencyLevel) {
+ private static class TargetUniquifier extends AbstractUniquifier<Target, Label> {
+ protected TargetUniquifier(int concurrencyLevel) {
super(concurrencyLevel);
}
@@ -1105,9 +1177,8 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
}
}
- private static class ThreadSafeSkyKeyUniquifier
- extends AbstractThreadSafeUniquifier<SkyKey, SkyKey> {
- protected ThreadSafeSkyKeyUniquifier(int concurrencyLevel) {
+ private static class SkyKeyUniquifier extends AbstractUniquifier<SkyKey, SkyKey> {
+ protected SkyKeyUniquifier(int concurrencyLevel) {
super(concurrencyLevel);
}
@@ -1121,9 +1192,9 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
* A uniquifer which takes a pair of parent and reverse dep, and uniquify based on the second
* element (reverse dep).
*/
- private static class ThreadSafeReverseDepSkyKeyUniquifier
- extends AbstractThreadSafeUniquifier<Pair<SkyKey, SkyKey>, SkyKey> {
- protected ThreadSafeReverseDepSkyKeyUniquifier(int concurrencyLevel) {
+ private static class ReverseDepSkyKeyUniquifier
+ extends AbstractUniquifier<Pair<SkyKey, SkyKey>, SkyKey> {
+ protected ReverseDepSkyKeyUniquifier(int concurrencyLevel) {
super(concurrencyLevel);
}
@@ -1146,18 +1217,25 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
* <p>This callback may be called from multiple threads concurrently. At most one thread will call
* the wrapped {@code callback} concurrently.
*/
- @ThreadSafe
- private static class BatchStreamedCallback extends OutputFormatterCallback<Target>
- implements ThreadSafeCallback<Target> {
-
- private final OutputFormatterCallback<Target> callback;
- private final ThreadSafeUniquifier<Target> uniquifier =
- new ThreadSafeTargetUniquifier(DEFAULT_THREAD_COUNT);
+ // TODO(nharmata): For queries with less than {@code batchThreshold} results, this batching
+ // strategy probably hurts performance since we can only start formatting results once the entire
+ // query is finished.
+ private static class BatchStreamedCallback extends ThreadSafeOutputFormatterCallback<Target>
+ implements Callback<Target> {
+
+ // TODO(nharmata): Now that we know the wrapped callback is ThreadSafe, there's no correctness
+ // concern that requires the prohibition of concurrent uses of the callback; the only concern is
+ // memory. We should have a threshold for when to invoke the callback with a batch, and also a
+ // separate, larger, bound on the number of targets being processed at the same time.
+ private final ThreadSafeOutputFormatterCallback<Target> callback;
+ private final Uniquifier<Target> uniquifier = new TargetUniquifier(DEFAULT_THREAD_COUNT);
private final Object pendingLock = new Object();
private List<Target> pending = new ArrayList<>();
private int batchThreshold;
- private BatchStreamedCallback(OutputFormatterCallback<Target> callback, int batchThreshold) {
+ private BatchStreamedCallback(
+ ThreadSafeOutputFormatterCallback<Target> callback,
+ int batchThreshold) {
this.callback = callback;
this.batchThreshold = batchThreshold;
}
@@ -1201,26 +1279,23 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
@ThreadSafe
@Override
- public void getAllRdepsUnboundedParallel(
+ public QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
QueryExpression expression,
VariableContext<Target> context,
- ThreadSafeCallback<Target> callback,
- ForkJoinPool forkJoinPool)
- throws QueryException, InterruptedException {
- ParallelSkyQueryUtils.getAllRdepsUnboundedParallel(
+ Callback<Target> callback) {
+ return ParallelSkyQueryUtils.getAllRdepsUnboundedParallel(
this, expression, context, callback, packageSemaphore);
}
@ThreadSafe
@Override
- public void getAllRdeps(
+ public QueryTaskFuture<Void> getAllRdeps(
QueryExpression expression,
Predicate<Target> universe,
VariableContext<Target> context,
Callback<Target> callback,
- int depth)
- throws QueryException, InterruptedException {
- getAllRdeps(expression, universe, context, callback, depth, BATCH_CALLBACK_SIZE);
+ int depth) {
+ return getAllRdeps(expression, universe, context, callback, depth, BATCH_CALLBACK_SIZE);
}
/**
@@ -1230,16 +1305,15 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
* nodes are directly depended on by a large number of other nodes.
*/
@VisibleForTesting
- protected void getAllRdeps(
+ protected QueryTaskFuture<Void> getAllRdeps(
QueryExpression expression,
Predicate<Target> universe,
VariableContext<Target> context,
Callback<Target> callback,
int depth,
- int batchSize)
- throws QueryException, InterruptedException {
+ int batchSize) {
Uniquifier<Target> uniquifier = createUniquifier();
- eval(
+ return eval(
expression,
context,
new BatchAllRdepsCallback(uniquifier, universe, callback, depth, batchSize));
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AbstractQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AbstractQueryEnvironment.java
new file mode 100644
index 0000000000..62fd91b56f
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AbstractQueryEnvironment.java
@@ -0,0 +1,194 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.query2.engine;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+import com.google.devtools.build.lib.util.Preconditions;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * A partial implementation of {@link QueryEnvironment} that has trivial in-thread implementations
+ * of all the {@link QueryTaskFuture}/{@link QueryTaskCallable} helper methods.
+ */
+public abstract class AbstractQueryEnvironment<T> implements QueryEnvironment<T> {
+ /** Concrete implementation of {@link QueryTaskFuture}. */
+ protected static final class QueryTaskFutureImpl<T>
+ extends QueryTaskFutureImplBase<T> implements ListenableFuture<T> {
+ private final ListenableFuture<T> delegate;
+
+ private QueryTaskFutureImpl(ListenableFuture<T> delegate) {
+ this.delegate = delegate;
+ }
+
+ public static <R> QueryTaskFutureImpl<R> ofDelegate(ListenableFuture<R> delegate) {
+ return (delegate instanceof QueryTaskFutureImpl)
+ ? (QueryTaskFutureImpl<R>) delegate
+ : new QueryTaskFutureImpl<>(delegate);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return delegate.cancel(mayInterruptIfRunning);
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return delegate.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return delegate.isDone();
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ return delegate.get();
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return delegate.get(timeout, unit);
+ }
+
+ @Override
+ public void addListener(Runnable listener, Executor executor) {
+ delegate.addListener(listener, executor);
+ }
+
+ @Override
+ public T getIfSuccessful() {
+ Preconditions.checkState(delegate.isDone());
+ try {
+ return delegate.get();
+ } catch (CancellationException | InterruptedException | ExecutionException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public T getChecked() throws InterruptedException, QueryException {
+ try {
+ return get();
+ } catch (CancellationException e) {
+ throw new InterruptedException();
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ Throwables.propagateIfPossible(cause, QueryException.class);
+ Throwables.propagateIfPossible(cause, InterruptedException.class);
+ throw new IllegalStateException(e.getCause());
+ }
+ }
+ }
+
+ @Override
+ public <R> QueryTaskFuture<R> immediateSuccessfulFuture(R value) {
+ return new QueryTaskFutureImpl<>(Futures.immediateFuture(value));
+ }
+
+ @Override
+ public <R> QueryTaskFuture<R> immediateFailedFuture(QueryException e) {
+ return new QueryTaskFutureImpl<>(Futures.<R>immediateFailedFuture(e));
+ }
+
+ @Override
+ public <R> QueryTaskFuture<R> immediateCancelledFuture() {
+ return new QueryTaskFutureImpl<>(Futures.<R>immediateCancelledFuture());
+ }
+
+ @Override
+ public QueryTaskFuture<Void> eval(
+ QueryExpression expr, VariableContext<T> context, Callback<T> callback) {
+ return expr.eval(this, context, callback);
+ }
+
+ @Override
+ public <R> QueryTaskFuture<R> executeAsync(QueryTaskCallable<R> callable) {
+ try {
+ return immediateSuccessfulFuture(callable.call());
+ } catch (QueryException e) {
+ return immediateFailedFuture(e);
+ } catch (InterruptedException e) {
+ return immediateCancelledFuture();
+ }
+ }
+
+ @Override
+ public <R> QueryTaskFuture<R> whenSucceedsCall(
+ QueryTaskFuture<?> future, QueryTaskCallable<R> callable) {
+ return whenAllSucceedCall(ImmutableList.of(future), callable);
+ }
+
+ private static class Dummy implements QueryTaskCallable<Void> {
+ public static final Dummy INSTANCE = new Dummy();
+
+ private Dummy() {}
+
+ @Override
+ public Void call() {
+ return null;
+ }
+ }
+
+ @Override
+ public QueryTaskFuture<Void> whenAllSucceed(Iterable<? extends QueryTaskFuture<?>> futures) {
+ return whenAllSucceedCall(futures, Dummy.INSTANCE);
+ }
+
+ @Override
+ public <R> QueryTaskFuture<R> whenAllSucceedCall(
+ Iterable<? extends QueryTaskFuture<?>> futures, QueryTaskCallable<R> callable) {
+ return QueryTaskFutureImpl.ofDelegate(
+ Futures.whenAllSucceed(cast(futures)).call(callable));
+ }
+
+ @Override
+ public <T1, T2> QueryTaskFuture<T2> transformAsync(
+ QueryTaskFuture<T1> future,
+ final Function<T1, QueryTaskFuture<T2>> function) {
+ return QueryTaskFutureImpl.ofDelegate(
+ Futures.transformAsync(
+ (QueryTaskFutureImpl<T1>) future,
+ new AsyncFunction<T1, T2>() {
+ @Override
+ public ListenableFuture<T2> apply(T1 input) throws Exception {
+ return (QueryTaskFutureImpl<T2>) function.apply(input);
+ }
+ }));
+ }
+
+ protected static Iterable<QueryTaskFutureImpl<?>> cast(
+ Iterable<? extends QueryTaskFuture<?>> futures) {
+ return Iterables.transform(
+ futures,
+ new Function<QueryTaskFuture<?>, QueryTaskFutureImpl<?>>() {
+ @Override
+ public QueryTaskFutureImpl<?> apply(QueryTaskFuture<?> future) {
+ return (QueryTaskFutureImpl<?>) future;
+ }
+ });
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java
index adc12d278f..81be4c8ca2 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java
@@ -21,12 +21,12 @@ import com.google.common.collect.Sets;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
/**
* Implementation of the <code>allpaths()</code> function.
@@ -51,46 +51,47 @@ public class AllPathsFunction implements QueryFunction {
}
@Override
- public <T> void eval(
- QueryEnvironment<T> env,
+ public <T> QueryTaskFuture<Void> eval(
+ final QueryEnvironment<T> env,
VariableContext<T> context,
- QueryExpression expression,
+ final QueryExpression expression,
List<Argument> args,
- Callback<T> callback) throws QueryException, InterruptedException {
-
- Set<T> fromValue = QueryUtil.evalAll(env, context, args.get(0).getExpression());
- Set<T> toValue = QueryUtil.evalAll(env, context, args.get(1).getExpression());
+ final Callback<T> callback) {
+ final QueryTaskFuture<Set<T>> fromValueFuture =
+ QueryUtil.evalAll(env, context, args.get(0).getExpression());
+ final QueryTaskFuture<Set<T>> toValueFuture =
+ QueryUtil.evalAll(env, context, args.get(1).getExpression());
- // Algorithm: compute "reachableFromX", the forward transitive closure of
- // the "from" set, then find the intersection of "reachableFromX" with the
- // reverse transitive closure of the "to" set. The reverse transitive
- // closure and intersection operations are interleaved for efficiency.
- // "result" holds the intersection.
+ return env.whenAllSucceedCall(
+ ImmutableList.of(fromValueFuture, toValueFuture),
+ new QueryTaskCallable<Void>() {
+ @Override
+ public Void call() throws QueryException, InterruptedException {
+ // Algorithm: compute "reachableFromX", the forward transitive closure of
+ // the "from" set, then find the intersection of "reachableFromX" with the
+ // reverse transitive closure of the "to" set. The reverse transitive
+ // closure and intersection operations are interleaved for efficiency.
+ // "result" holds the intersection.
- env.buildTransitiveClosure(expression, fromValue, Integer.MAX_VALUE);
+ Set<T> fromValue = fromValueFuture.getIfSuccessful();
+ Set<T> toValue = toValueFuture.getIfSuccessful();
- Set<T> reachableFromX = env.getTransitiveClosure(fromValue);
- Predicate<T> reachable = Predicates.in(reachableFromX);
- Uniquifier<T> uniquifier = env.createUniquifier();
- Collection<T> result = uniquifier.unique(intersection(reachableFromX, toValue));
- callback.process(result);
- Collection<T> worklist = result;
- while (!worklist.isEmpty()) {
- Collection<T> reverseDeps = env.getReverseDeps(worklist);
- worklist = uniquifier.unique(Iterables.filter(reverseDeps, reachable));
- callback.process(worklist);
- }
- }
+ env.buildTransitiveClosure(expression, fromValue, Integer.MAX_VALUE);
- @Override
- public <T> void parEval(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- QueryExpression expression,
- List<Argument> args,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
- eval(env, context, expression, args, callback);
+ Set<T> reachableFromX = env.getTransitiveClosure(fromValue);
+ Predicate<T> reachable = Predicates.in(reachableFromX);
+ Uniquifier<T> uniquifier = env.createUniquifier();
+ Collection<T> result = uniquifier.unique(intersection(reachableFromX, toValue));
+ callback.process(result);
+ Collection<T> worklist = result;
+ while (!worklist.isEmpty()) {
+ Collection<T> reverseDeps = env.getReverseDeps(worklist);
+ worklist = uniquifier.unique(Iterables.filter(reverseDeps, reachable));
+ callback.process(worklist);
+ }
+ return null;
+ }
+ });
}
/**
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
index 518b67497b..f800d804dc 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
@@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
+import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
@@ -20,10 +21,9 @@ import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ForkJoinPool;
/**
* An "allrdeps" query expression, which computes the reverse dependencies of the argument within
@@ -52,30 +52,34 @@ public class AllRdepsFunction implements QueryFunction {
}
@Override
- public <T> void eval(
+ public <T> QueryTaskFuture<Void> eval(
QueryEnvironment<T> env,
VariableContext<T> context,
QueryExpression expression,
List<Argument> args,
- Callback<T> callback) throws QueryException, InterruptedException {
- eval(env, context, args, callback, Predicates.<T>alwaysTrue());
+ Callback<T> callback) {
+ return eval(env, context, args, callback, Optional.<Predicate<T>>absent());
}
- protected <T> void eval(
+ protected <T> QueryTaskFuture<Void> eval(
final QueryEnvironment<T> env,
VariableContext<T> context,
final List<Argument> args,
final Callback<T> callback,
- final Predicate<T> universe)
- throws QueryException, InterruptedException {
-
+ Optional<Predicate<T>> universeMaybe) {
final int depth = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE;
+ final Predicate<T> universe = universeMaybe.isPresent()
+ ? universeMaybe.get()
+ : Predicates.<T>alwaysTrue();
if (env instanceof StreamableQueryEnvironment<?>) {
- ((StreamableQueryEnvironment<T>) env)
- .getAllRdeps(args.get(0).getExpression(), universe, context, callback, depth);
+ StreamableQueryEnvironment<T> streamableEnv = ((StreamableQueryEnvironment<T>) env);
+ return depth == Integer.MAX_VALUE && !universeMaybe.isPresent()
+ ? streamableEnv.getAllRdepsUnboundedParallel(args.get(0).getExpression(), context, callback)
+ : streamableEnv.getAllRdeps(
+ args.get(0).getExpression(), universe, context, callback, depth);
} else {
final Uniquifier<T> uniquifier = env.createUniquifier();
- env.eval(
+ return env.eval(
args.get(0).getExpression(),
context,
new Callback<T>() {
@@ -103,21 +107,4 @@ public class AllRdepsFunction implements QueryFunction {
});
}
}
-
- @Override
- public <T> void parEval(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- QueryExpression expression,
- List<Argument> args,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
- boolean unbounded = args.size() == 1;
- if (unbounded && env instanceof StreamableQueryEnvironment<?>) {
- ((StreamableQueryEnvironment<T>) env).getAllRdepsUnboundedParallel(
- args.get(0).getExpression(), context, callback, forkJoinPool);
- } else {
- eval(env, context, expression, args, callback);
- }
- }
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
index 89374d0a94..f9d20dbb19 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
@@ -13,16 +13,16 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
+import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
-import com.google.devtools.build.lib.query2.engine.Lexer.TokenKind;
-import com.google.devtools.build.lib.query2.engine.ParallelQueryUtils.QueryTask;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import com.google.devtools.build.lib.util.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
/**
* A binary algebraic set operation.
@@ -56,40 +56,84 @@ public class BinaryOperatorExpression extends QueryExpression {
}
@Override
- protected <T> void evalImpl(
- QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
- throws QueryException, InterruptedException {
+ public <T> QueryTaskFuture<Void> eval(
+ QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) {
+ switch (operator) {
+ case PLUS:
+ case UNION:
+ return evalPlus(operands, env, context, callback);
+ case MINUS:
+ case EXCEPT:
+ return evalMinus(operands, env, context, callback);
+ case INTERSECT:
+ case CARET:
+ return evalIntersect(env, context, callback);
+ default:
+ throw new IllegalStateException(operator.toString());
+ }
+ }
- if (operator == TokenKind.PLUS || operator == TokenKind.UNION) {
- for (QueryExpression operand : operands) {
- env.eval(operand, context, callback);
- }
- return;
+ /**
+ * Evaluates an expression of the form "e1 + e2 + ... + eK" by evaluating all the subexpressions
+ * separately.
+ *
+ * <p>N.B. {@code operands.size()} may be {@code 1}.
+ */
+ private static <T> QueryTaskFuture<Void> evalPlus(
+ ImmutableList<QueryExpression> operands,
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ Callback<T> callback) {
+ ArrayList<QueryTaskFuture<Void>> queryTasks = new ArrayList<>(operands.size());
+ for (QueryExpression operand : operands) {
+ queryTasks.add(env.eval(operand, context, callback));
}
+ return env.whenAllSucceed(queryTasks);
+ }
- // Once we have fully evaluated the left-hand side, we can stream-process the right-hand side
- // for minus operations. Note that this is suboptimal if the left-hand side results are very
- // large compared to the right-hand side. Which is the case is hard to know before evaluating.
- // We could consider determining this dynamically, however, by evaluating both the left and
- // right hand side partially until one side finishes sooner.
- final Set<T> lhsValue = QueryUtil.evalAll(env, context, operands.get(0));
- if (operator == TokenKind.EXCEPT || operator == TokenKind.MINUS) {
- for (int i = 1; i < operands.size(); i++) {
- env.eval(operands.get(i), context,
- new Callback<T>() {
+ /**
+ * Evaluates an expression of the form "e1 - e2 - ... - eK" by noting its equivalence to
+ * "e1 - (e2 + ... + eK)" and evaluating the subexpressions on the right-hand-side separately.
+ */
+ private static <T> QueryTaskFuture<Void> evalMinus(
+ final ImmutableList<QueryExpression> operands,
+ final QueryEnvironment<T> env,
+ final VariableContext<T> context,
+ final Callback<T> callback) {
+ QueryTaskFuture<Set<T>> lhsValueFuture = QueryUtil.evalAll(env, context, operands.get(0));
+ Function<Set<T>, QueryTaskFuture<Void>> substractAsyncFunction =
+ new Function<Set<T>, QueryTaskFuture<Void>>() {
+ @Override
+ public QueryTaskFuture<Void> apply(Set<T> lhsValue) {
+ final Set<T> threadSafeLhsValue = Sets.newConcurrentHashSet(lhsValue);
+ Callback<T> subtractionCallback = new Callback<T>() {
+ @Override
+ public void process(Iterable<T> partialResult) {
+ for (T target : partialResult) {
+ threadSafeLhsValue.remove(target);
+ }
+ }
+ };
+ QueryTaskFuture<Void> rhsEvaluatedFuture = evalPlus(
+ operands.subList(1, operands.size()), env, context, subtractionCallback);
+ return env.whenSucceedsCall(
+ rhsEvaluatedFuture,
+ new QueryTaskCallable<Void>() {
@Override
- public void process(Iterable<T> partialResult)
- throws QueryException, InterruptedException {
- for (T target : partialResult) {
- lhsValue.remove(target);
- }
+ public Void call() throws QueryException, InterruptedException {
+ callback.process(threadSafeLhsValue);
+ return null;
}
});
}
- callback.process(lhsValue);
- return;
- }
+ };
+ return env.transformAsync(lhsValueFuture, substractAsyncFunction);
+ }
+ private <T> QueryTaskFuture<Void> evalIntersect(
+ final QueryEnvironment<T> env,
+ final VariableContext<T> context,
+ final Callback<T> callback) {
// For each right-hand side operand, intersection cannot be performed in a streaming manner; the
// entire result of that operand is needed. So, in order to avoid pinning too much in memory at
// once, we process each right-hand side operand one at a time and throw away that operand's
@@ -97,77 +141,39 @@ public class BinaryOperatorExpression extends QueryExpression {
// TODO(bazel-team): Consider keeping just the name / label of the right-hand side results
// instead of the potentially heavy-weight instances of type T. This would let us process all
// right-hand side operands in parallel without worrying about memory usage.
- Preconditions.checkState(operator == TokenKind.INTERSECT || operator == TokenKind.CARET,
- operator);
+ QueryTaskFuture<Set<T>> rollingResultFuture = QueryUtil.evalAll(env, context, operands.get(0));
for (int i = 1; i < operands.size(); i++) {
- lhsValue.retainAll(QueryUtil.evalAll(env, context, operands.get(i)));
+ final int index = i;
+ Function<Set<T>, QueryTaskFuture<Set<T>>> evalOperandAndIntersectAsyncFunction =
+ new Function<Set<T>, QueryTaskFuture<Set<T>>>() {
+ @Override
+ public QueryTaskFuture<Set<T>> apply(final Set<T> rollingResult) {
+ final QueryTaskFuture<Set<T>> rhsOperandValueFuture =
+ QueryUtil.evalAll(env, context, operands.get(index));
+ return env.whenSucceedsCall(
+ rhsOperandValueFuture,
+ new QueryTaskCallable<Set<T>>() {
+ @Override
+ public Set<T> call() throws QueryException, InterruptedException {
+ rollingResult.retainAll(rhsOperandValueFuture.getIfSuccessful());
+ return rollingResult;
+ }
+ });
+ }
+ };
+ rollingResultFuture =
+ env.transformAsync(rollingResultFuture, evalOperandAndIntersectAsyncFunction);
}
- callback.process(lhsValue);
- }
-
- @Override
- protected <T> void parEvalImpl(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool)
- throws QueryException, InterruptedException {
- if (operator == TokenKind.PLUS || operator == TokenKind.UNION) {
- parEvalPlus(operands, env, context, callback, forkJoinPool);
- } else if (operator == TokenKind.EXCEPT || operator == TokenKind.MINUS) {
- parEvalMinus(operands, env, context, callback, forkJoinPool);
- } else {
- evalImpl(env, context, callback);
- }
- }
-
- /**
- * Evaluates an expression of the form "e1 + e2 + ... + eK" by evaluating all the subexpressions
- * in parallel.
- */
- private static <T> void parEvalPlus(
- ImmutableList<QueryExpression> operands,
- final QueryEnvironment<T> env,
- final VariableContext<T> context,
- final ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool)
- throws QueryException, InterruptedException {
- ArrayList<QueryTask> queryTasks = new ArrayList<>(operands.size());
- for (final QueryExpression operand : operands) {
- queryTasks.add(new QueryTask() {
- @Override
- public void execute() throws QueryException, InterruptedException {
- env.eval(operand, context, callback);
- }
- });
- }
- ParallelQueryUtils.executeQueryTasksAndWaitInterruptiblyFailFast(queryTasks, forkJoinPool);
- }
-
- /**
- * Evaluates an expression of the form "e1 - e2 - ... - eK" by noting its equivalence to
- * "e1 - (e2 + ... + eK)" and evaluating the subexpressions on the right-hand-side in parallel.
- */
- private static <T> void parEvalMinus(
- ImmutableList<QueryExpression> operands,
- QueryEnvironment<T> env,
- VariableContext<T> context,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool)
- throws QueryException, InterruptedException {
- final Set<T> lhsValue =
- Sets.newConcurrentHashSet(QueryUtil.evalAll(env, context, operands.get(0)));
- ThreadSafeCallback<T> subtractionCallback = new ThreadSafeCallback<T>() {
- @Override
- public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
- for (T target : partialResult) {
- lhsValue.remove(target);
- }
- }
- };
- parEvalPlus(
- operands.subList(1, operands.size()), env, context, subtractionCallback, forkJoinPool);
- callback.process(lhsValue);
+ final QueryTaskFuture<Set<T>> resultFuture = rollingResultFuture;
+ return env.whenSucceedsCall(
+ resultFuture,
+ new QueryTaskCallable<Void>() {
+ @Override
+ public Void call() throws QueryException, InterruptedException {
+ callback.process(resultFuture.getIfSuccessful());
+ return null;
+ }
+ });
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java
index d2a2eb0fa8..cbc0ae8d38 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java
@@ -19,10 +19,9 @@ import com.google.devtools.build.lib.collect.CompactHashSet;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
/**
* A buildfiles(x) query expression, which computes the set of BUILD files and
@@ -42,18 +41,17 @@ class BuildFilesFunction implements QueryFunction {
}
@Override
- public <T> void eval(
+ public <T> QueryTaskFuture<Void> eval(
final QueryEnvironment<T> env,
VariableContext<T> context,
final QueryExpression expression,
List<Argument> args,
- final Callback<T> callback)
- throws QueryException, InterruptedException {
+ final Callback<T> callback) {
final Uniquifier<T> uniquifier = env.createUniquifier();
- env.eval(
+ return env.eval(
args.get(0).getExpression(),
context,
- new ThreadSafeCallback<T>() {
+ new Callback<T>() {
@Override
public void process(Iterable<T> partialResult)
throws QueryException, InterruptedException {
@@ -67,18 +65,6 @@ class BuildFilesFunction implements QueryFunction {
}
@Override
- public <T> void parEval(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- QueryExpression expression,
- List<Argument> args,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
- // 'eval' is written in such a way that it enables parallel evaluation of 'expression'.
- eval(env, context, expression, args, callback);
- }
-
- @Override
public int getMandatoryArguments() {
return 1;
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java
index 0f4321145d..51c51fa99b 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java
@@ -13,14 +13,17 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.util.BatchCallback;
+import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
/**
* Query callback to be called by a {@link QueryExpression} when it has part of the computation
* result. Assuming the {@code QueryEnvironment} supports it, it would allow the caller
* to stream the results.
*/
-public interface Callback<T> extends BatchCallback<T, QueryException> {
+@ThreadSafe
+public interface Callback<T> extends ThreadSafeBatchCallback<T, QueryException> {
/**
* According to the {@link BatchCallback} interface, repeated elements may be passed in here.
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java
index 5eca701702..7317a35028 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java
@@ -18,10 +18,10 @@ import com.google.common.collect.Sets;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.Collection;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
/**
* A "deps" query expression, which computes the dependencies of the argument. An optional
@@ -53,15 +53,15 @@ final class DepsFunction implements QueryFunction {
* Breadth-first search from the arguments.
*/
@Override
- public <T> void eval(
+ public <T> QueryTaskFuture<Void> eval(
final QueryEnvironment<T> env,
VariableContext<T> context,
final QueryExpression expression,
List<Argument> args,
- final Callback<T> callback) throws QueryException, InterruptedException {
+ final Callback<T> callback) {
final int depthBound = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE;
final Uniquifier<T> uniquifier = env.createUniquifier();
- env.eval(args.get(0).getExpression(), context, new Callback<T>() {
+ return env.eval(args.get(0).getExpression(), context, new Callback<T>() {
@Override
public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
Collection<T> current = Sets.newHashSet(partialResult);
@@ -83,15 +83,4 @@ final class DepsFunction implements QueryFunction {
}
});
}
-
- @Override
- public <T> void parEval(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- QueryExpression expression,
- List<Argument> args,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
- eval(env, context, expression, args, callback);
- }
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java
index a31196aba5..85cfe9fddb 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java
@@ -20,10 +20,9 @@ import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.ForkJoinPool;
/**
* A query expression for user-defined query functions.
@@ -46,19 +45,9 @@ public class FunctionExpression extends QueryExpression {
}
@Override
- protected <T> void evalImpl(
- QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
- throws QueryException, InterruptedException {
- function.eval(env, context, this, args, callback);
- }
-
- @Override
- protected <T> void parEvalImpl(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
- function.parEval(env, context, this, args, callback, forkJoinPool);
+ public <T> QueryTaskFuture<Void> eval(
+ QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) {
+ return function.eval(env, context, this, args, callback);
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java
index 4fa428adb5..1d68573578 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java
@@ -17,9 +17,9 @@ import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ForkJoinPool;
/**
* A label(attr_name, argument) expression, which computes the set of targets
@@ -52,16 +52,15 @@ class LabelsFunction implements QueryFunction {
}
@Override
- public <T> void eval(
+ public <T> QueryTaskFuture<Void> eval(
final QueryEnvironment<T> env,
VariableContext<T> context,
final QueryExpression expression,
final List<Argument> args,
- final Callback<T> callback)
- throws QueryException, InterruptedException {
+ final Callback<T> callback) {
final String attrName = args.get(0).getWord();
final Uniquifier<T> uniquifier = env.createUniquifier();
- env.eval(args.get(1).getExpression(), context, new Callback<T>() {
+ return env.eval(args.get(1).getExpression(), context, new Callback<T>() {
@Override
public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
for (T input : partialResult) {
@@ -80,15 +79,4 @@ class LabelsFunction implements QueryFunction {
}
});
}
-
- @Override
- public <T> void parEval(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- QueryExpression expression,
- List<Argument> args,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
- eval(env, context, expression, args, callback);
- }
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java
index 64d94da19a..a7c3abeb62 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java
@@ -13,6 +13,8 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
+import com.google.common.base.Function;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.Collection;
import java.util.Set;
import java.util.regex.Pattern;
@@ -64,15 +66,24 @@ class LetExpression extends QueryExpression {
}
@Override
- protected <T> void evalImpl(
- QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
- throws QueryException, InterruptedException {
+ public <T> QueryTaskFuture<Void> eval(
+ final QueryEnvironment<T> env,
+ final VariableContext<T> context,
+ final Callback<T> callback) {
if (!NAME_PATTERN.matcher(varName).matches()) {
- throw new QueryException(this, "invalid variable name '" + varName + "' in let expression");
+ return env.immediateFailedFuture(
+ new QueryException(this, "invalid variable name '" + varName + "' in let expression"));
}
- Set<T> varValue = QueryUtil.evalAll(env, context, varExpr);
- VariableContext<T> bodyContext = VariableContext.with(context, varName, varValue);
- env.eval(bodyExpr, bodyContext, callback);
+ QueryTaskFuture<Set<T>> varValueFuture = QueryUtil.evalAll(env, context, varExpr);
+ Function<Set<T>, QueryTaskFuture<Void>> evalBodyAsyncFunction =
+ new Function<Set<T>, QueryTaskFuture<Void>>() {
+ @Override
+ public QueryTaskFuture<Void> apply(Set<T> varValue) {
+ VariableContext<T> bodyContext = VariableContext.with(context, varName, varValue);
+ return env.eval(bodyExpr, bodyContext, callback);
+ }
+ };
+ return env.transformAsync(varValueFuture, evalBodyAsyncFunction);
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java
index 311a6afff5..80b912f6d7 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java
@@ -16,10 +16,9 @@ package com.google.devtools.build.lib.query2.engine;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.collect.CompactHashSet;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
/**
* A loadfiles(x) query expression, which computes the set of .bzl files
@@ -38,15 +37,14 @@ class LoadFilesFunction implements QueryEnvironment.QueryFunction {
}
@Override
- public <T> void eval(
+ public <T> QueryTaskFuture<Void> eval(
final QueryEnvironment<T> env,
VariableContext<T> context,
final QueryExpression expression,
List<QueryEnvironment.Argument> args,
- final Callback<T> callback)
- throws QueryException, InterruptedException {
+ final Callback<T> callback) {
final Uniquifier<T> uniquifier = env.createUniquifier();
- env.eval(
+ return env.eval(
args.get(0).getExpression(),
context,
new Callback<T>() {
@@ -67,17 +65,6 @@ class LoadFilesFunction implements QueryEnvironment.QueryFunction {
}
@Override
- public <T> void parEval(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- QueryExpression expression,
- List<Argument> args,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
- eval(env, context, expression, args, callback);
- }
-
- @Override
public int getMandatoryArguments() {
return 1;
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java
index 5d21c874be..50708d6816 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java
@@ -46,7 +46,7 @@ public abstract class OutputFormatterCallback<T> implements Callback<T> {
* disambiguate between real interruptions or IO Exceptions.
*/
@Override
- public final void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
+ public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
try {
processOutput(partialResult);
} catch (IOException e) {
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java
deleted file mode 100644
index 6e22709deb..0000000000
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java
+++ /dev/null
@@ -1,188 +0,0 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.query2.engine;
-
-import com.google.common.collect.Iterables;
-import com.google.devtools.build.lib.concurrent.MoreFutures;
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinTask;
-import java.util.concurrent.Future;
-
-/** Several utilities to aid in writing {@link QueryExpression#parEvalImpl} implementations. */
-public class ParallelQueryUtils {
- /**
- * Encapsulation of a subtask of parallel evaluation of a {@link QueryExpression}. See
- * {@link #executeQueryTasksAndWaitInterruptiblyFailFast}.
- */
- @ThreadSafe
- public interface QueryTask {
- void execute() throws QueryException, InterruptedException;
- }
-
- /**
- * Executes the given {@link QueryTask}s using the given {@link ForkJoinPool} and interruptibly
- * waits for their completion. Throws the first {@link QueryException} encountered during parallel
- * execution or an {@link InterruptedException} if the calling thread is interrupted.
- *
- * <p>These "fail-fast" semantics are desirable to avoid doing unneeded work when evaluating
- * multiple {@link QueryTask}s in parallel: if serial execution of the tasks would result in a
- * {@link QueryException} then we want parallel execution to do so as well, but there's no need to
- * continue waiting for completion of the tasks after at least one of them results in a
- * {@link QueryException}.
- */
- public static void executeQueryTasksAndWaitInterruptiblyFailFast(
- List<QueryTask> queryTasks,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
- int numTasks = queryTasks.size();
- if (numTasks == 1) {
- Iterables.getOnlyElement(queryTasks).execute();
- return;
- }
- FailFastCountDownLatch failFastLatch = new FailFastCountDownLatch(numTasks);
- ArrayList<QueryTaskForkJoinTask> forkJoinTasks = new ArrayList<>(numTasks);
- for (QueryTask queryTask : queryTasks) {
- QueryTaskForkJoinTask forkJoinTask = adaptAsForkJoinTask(queryTask, failFastLatch);
- forkJoinTasks.add(forkJoinTask);
- @SuppressWarnings("unused")
- Future<?> possiblyIgnoredError = forkJoinPool.submit(forkJoinTask);
- }
- failFastLatch.await();
- try {
- MoreFutures.waitForAllInterruptiblyFailFast(forkJoinTasks);
- } catch (ExecutionException e) {
- throw rethrowCause(e);
- }
- }
-
- private static QueryTaskForkJoinTask adaptAsForkJoinTask(
- QueryTask queryTask,
- FailFastCountDownLatch failFastLatch) {
- return new QueryTaskForkJoinTask(queryTask, failFastLatch);
- }
-
- private static RuntimeException rethrowCause(ExecutionException e)
- throws QueryException, InterruptedException {
- Throwable cause = e.getCause();
- if (cause instanceof ParallelRuntimeException) {
- ((ParallelRuntimeException) cause).rethrow();
- }
- throw new IllegalStateException(e);
- }
-
- /**
- * Wrapper around a {@link CountDownLatch} with initial count {@code n} that counts down once on
- * "success" and {@code n} times on "failure".
- *
- * <p>This can be used in a concurrent context to wait until either {@code n} tasks are successful
- * or at least one of them fails.
- */
- @ThreadSafe
- private static class FailFastCountDownLatch {
- private final int n;
- private final CountDownLatch completionLatch;
-
- private FailFastCountDownLatch(int n) {
- this.n = n;
- this.completionLatch = new CountDownLatch(n);
- }
-
- private void await() throws InterruptedException {
- completionLatch.await();
- }
-
- private void countDown(boolean success) {
- if (success) {
- completionLatch.countDown();
- } else {
- for (int i = 0; i < n; i++) {
- completionLatch.countDown();
- }
- }
- }
- }
-
- // ForkJoinTask#adapt(Callable) wraps thrown checked exceptions as RuntimeExceptions. We avoid
- // having to think about that messiness (which is inconsistent with other Future implementations)
- // by having our own ForkJoinTask subclass and managing checked exceptions ourselves.
- @ThreadSafe
- private static class QueryTaskForkJoinTask extends ForkJoinTask<Void> {
- private final QueryTask queryTask;
- private final FailFastCountDownLatch completionLatch;
-
- private QueryTaskForkJoinTask(QueryTask queryTask, FailFastCountDownLatch completionLatch) {
- this.queryTask = queryTask;
- this.completionLatch = completionLatch;
- }
-
- @Override
- public Void getRawResult() {
- return null;
- }
-
- @Override
- protected void setRawResult(Void value) {
- }
-
- @Override
- protected boolean exec() {
- boolean successful = false;
- try {
- queryTask.execute();
- successful = true;
- return true;
- } catch (QueryException queryException) {
- throw new ParallelRuntimeQueryException(queryException);
- } catch (InterruptedException interruptedException) {
- throw new ParallelInterruptedQueryException(interruptedException);
- } finally {
- completionLatch.countDown(successful);
- }
- }
- }
-
- private abstract static class ParallelRuntimeException extends RuntimeException {
- abstract void rethrow() throws QueryException, InterruptedException;
- }
-
- private static class ParallelRuntimeQueryException extends ParallelRuntimeException {
- private final QueryException queryException;
-
- private ParallelRuntimeQueryException(QueryException queryException) {
- this.queryException = queryException;
- }
-
- @Override
- void rethrow() throws QueryException, InterruptedException {
- throw queryException;
- }
- }
-
- private static class ParallelInterruptedQueryException extends ParallelRuntimeException {
- private final InterruptedException interruptedException;
-
- private ParallelInterruptedQueryException(InterruptedException interruptedException) {
- this.interruptedException = interruptedException;
- }
-
- @Override
- void rethrow() throws QueryException, InterruptedException {
- throw interruptedException;
- }
- }
-}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
index 879c9c1ddb..0c11eec8f2 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
@@ -13,11 +13,13 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
+import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import java.util.Collection;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Callable;
import javax.annotation.Nonnull;
/**
@@ -91,16 +93,14 @@ public interface QueryEnvironment<T> {
/** A user-defined query function. */
interface QueryFunction {
- /**
- * Name of the function as it appears in the query language.
- */
+ /** Name of the function as it appears in the query language. */
String getName();
/**
* The number of arguments that are required. The rest is optional.
*
- * <p>This should be greater than or equal to zero and at smaller than or equal to the length
- * of the list returned by {@link #getArgumentTypes}.
+ * <p>This should be greater than or equal to zero and at smaller than or equal to the length of
+ * the list returned by {@link #getArgumentTypes}.
*/
int getMandatoryArguments();
@@ -108,34 +108,21 @@ public interface QueryEnvironment<T> {
Iterable<ArgumentType> getArgumentTypes();
/**
- * Called when a user-defined function is to be evaluated.
+ * Returns a {@link QueryTaskFuture} representing the asynchronous application of this
+ * {@link QueryFunction} to the given {@code args}, feeding the results to the given
+ * {@code callback}.
*
* @param env the query environment this function is evaluated in.
* @param expression the expression being evaluated.
- * @param args the input arguments. These are type-checked against the specification returned
- * by {@link #getArgumentTypes} and {@link #getMandatoryArguments}
- */
- <T> void eval(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- QueryExpression expression,
- List<Argument> args,
- Callback<T> callback) throws QueryException, InterruptedException;
-
- /**
- * Same as {@link #eval(QueryEnvironment, VariableContext, QueryExpression, List, Callback)},
- * except that this {@link QueryFunction} may use {@code forkJoinPool} to achieve
- * parallelism.
- *
- * <p>The caller must ensure that {@code env} is thread safe.
+ * @param args the input arguments. These are type-checked against the specification returned by
+ * {@link #getArgumentTypes} and {@link #getMandatoryArguments}
*/
- <T> void parEval(
+ <T> QueryTaskFuture<Void> eval(
QueryEnvironment<T> env,
VariableContext<T> context,
QueryExpression expression,
List<Argument> args,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException;
+ Callback<T> callback);
}
/**
@@ -156,18 +143,8 @@ public interface QueryEnvironment<T> {
* Invokes {@code callback} with the set of target nodes in the graph for the specified target
* pattern, in 'blaze build' syntax.
*/
- void getTargetsMatchingPattern(QueryExpression owner, String pattern, Callback<T> callback)
- throws QueryException, InterruptedException;
-
- /**
- * Same as {@link #getTargetsMatchingPattern}, but optionally making use of the given
- * {@link ForkJoinPool} to achieve parallelism.
- */
- void getTargetsMatchingPatternPar(
- QueryExpression owner,
- String pattern,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException;
+ QueryTaskFuture<Void> getTargetsMatchingPattern(
+ QueryExpression owner, String pattern, Callback<T> callback);
/** Ensures the specified target exists. */
// NOTE(bazel-team): this method is left here as scaffolding from a previous refactoring. It may
@@ -203,14 +180,159 @@ public interface QueryEnvironment<T> {
Set<T> getNodesOnPath(T from, T to) throws InterruptedException;
/**
- * Eval an expression {@code expr} and pass the results to the {@code callback}.
+ * Returns a {@link QueryTaskFuture} representing the asynchronous evaluation of the given
+ * {@code expr} and passing of the results to the given {@code callback}.
*
* <p>Note that this method should guarantee that the callback does not see repeated elements.
+ *
* @param expr The expression to evaluate
* @param callback The caller callback to notify when results are available
*/
- void eval(QueryExpression expr, VariableContext<T> context, Callback<T> callback)
- throws QueryException, InterruptedException;
+ QueryTaskFuture<Void> eval(
+ QueryExpression expr, VariableContext<T> context, Callback<T> callback);
+
+ /**
+ * An asynchronous computation of part of a query evaluation.
+ *
+ * <p>A {@link QueryTaskFuture} can only be produced from scratch via {@link #eval},
+ * {@link #executeAsync}, {@link #immediateSuccessfulFuture}, {@link #immediateFailedFuture}, and
+ * {@link #immediateCancelledFuture}.
+ *
+ * <p>Combined with the helper methods like {@link #whenSucceedsCall} below, this is very similar
+ * to Guava's {@link ListenableFuture}.
+ *
+ * <p>This class is deliberately opaque; the only ways to compose/use {@link #QueryTaskFuture}
+ * instances are the helper methods like {@link #whenSucceedsCall} below. A crucial consequence of
+ * this is there is no way for a {@link QueryExpression} or {@link QueryFunction} implementation
+ * to block on the result of a {@link #QueryTaskFuture}. This eliminates a large class of
+ * deadlocks by design!
+ */
+ @ThreadSafe
+ public abstract class QueryTaskFuture<T> {
+ // We use a public abstract class with a private constructor so that this type is visible to all
+ // the query codebase, but yet the only possible implementation is under our control in this
+ // file.
+ private QueryTaskFuture() {}
+
+ /**
+ * If this {@link QueryTasksFuture}'s encapsulated computation is currently complete and
+ * successful, returns the result. This method is intended to be used in combination with
+ * {@link #whenSucceedsCall}.
+ *
+ * <p>See the javadoc for the various helper methods that produce {@link QueryTasksFuture} for
+ * the precise definition of "successful".
+ */
+ public abstract T getIfSuccessful();
+ }
+
+ /**
+ * Returns a {@link QueryTaskFuture} representing the successful computation of {@code value}.
+ *
+ * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
+ * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
+ * {@link QueryTaskFuture#getIfSuccessful}.
+ */
+ abstract <R> QueryTaskFuture<R> immediateSuccessfulFuture(R value);
+
+ /**
+ * Returns a {@link QueryTaskFuture} representing a computation that was unsuccessful because of
+ * {@code e}.
+ *
+ * <p>The returned {@link QueryTaskFuture} is considered "unsuccessful" for purposes of
+ * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
+ * {@link QueryTaskFuture#getIfSuccessful}.
+ */
+ abstract <R> QueryTaskFuture<R> immediateFailedFuture(QueryException e);
+
+ /**
+ * Returns a {@link QueryTaskFuture} representing a cancelled computation.
+ *
+ * <p>The returned {@link QueryTaskFuture} is considered "unsuccessful" for purposes of
+ * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
+ * {@link QueryTaskFuture#getIfSuccessful}.
+ */
+ abstract <R> QueryTaskFuture<R> immediateCancelledFuture();
+
+ /** A {@link ThreadSafe} {@link Callable} for computations during query evaluation. */
+ @ThreadSafe
+ public interface QueryTaskCallable<T> extends Callable<T> {
+ /**
+ * Returns the computed value or throws a {@link QueryException} on failure or a
+ * {@link InterruptedException} on interruption.
+ */
+ @Override
+ T call() throws QueryException, InterruptedException;
+ }
+
+ /**
+ * Returns a {@link QueryTaskFuture} representing the given computation {@code callable} being
+ * performed asynchronously.
+ *
+ * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
+ * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
+ * {@link QueryTaskFuture#getIfSuccessful} iff {@code callable#call} does not throw an exception.
+ */
+ <R> QueryTaskFuture<R> executeAsync(QueryTaskCallable<R> callable);
+
+ /**
+ * Returns a {@link QueryTaskFuture} representing the given computation {@code callable} being
+ * performed after the successful completion of the computation encapsulated by the given
+ * {@code future} has completed successfully.
+ *
+ * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
+ * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
+ * {@link QueryTaskFuture#getIfSuccessful} iff {@code future} is successful and
+ * {@code callable#call} does not throw an exception.
+ */
+ <R> QueryTaskFuture<R> whenSucceedsCall(QueryTaskFuture<?> future, QueryTaskCallable<R> callable);
+
+ /**
+ * Returns a {@link QueryTaskFuture} representing the successful completion of all the
+ * computations encapsulated by the given {@code futures}.
+ *
+ * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
+ * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
+ * {@link QueryTaskFuture#getIfSuccessful} iff all of the given computations are "successful".
+ */
+ QueryTaskFuture<Void> whenAllSucceed(Iterable<? extends QueryTaskFuture<?>> futures);
+
+ /**
+ * Returns a {@link QueryTaskFuture} representing the given computation {@code callable} being
+ * performed after the successful completion of all the computations encapsulated by the given
+ * {@code futures}.
+ *
+ * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
+ * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
+ * {@link QueryTaskFuture#getIfSuccessful} iff all of the given computations are "successful" and
+ * {@code callable#call} does not throw an exception.
+ */
+ <R> QueryTaskFuture<R> whenAllSucceedCall(
+ Iterable<? extends QueryTaskFuture<?>> futures, QueryTaskCallable<R> callable);
+
+ /**
+ * Returns a {@link QueryTaskFuture} representing the asynchronous application of the given
+ * {@code function} to the value produced by the computation encapsulated by the given
+ * {@code future}.
+ *
+ * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
+ * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
+ * {@link QueryTaskFuture#getIfSuccessful} iff {@code} future is "successful".
+ */
+ <T1, T2> QueryTaskFuture<T2> transformAsync(
+ QueryTaskFuture<T1> future, Function<T1, QueryTaskFuture<T2>> function);
+
+ /**
+ * The sole package-protected subclass of {@link QueryTaskFuture}.
+ *
+ * <p>Do not subclass this class; it's an implementation detail. {@link QueryExpression} and
+ * {@link QueryFunction} implementations should use {@link #eval} and {@link #executeAsync} to get
+ * access to {@link QueryTaskFuture} instances and the then use the helper methods like
+ * {@link #whenSucceedsCall} to transform them.
+ */
+ abstract class QueryTaskFutureImplBase<T> extends QueryTaskFuture<T> {
+ protected QueryTaskFutureImplBase() {
+ }
+ }
/**
* Creates a Uniquifier for use in a {@code QueryExpression}. Note that the usage of this an
@@ -372,9 +494,6 @@ public interface QueryEnvironment<T> {
Set<QueryVisibility<T>> getVisibility(T from) throws QueryException, InterruptedException;
}
- /** Returns the {@link QueryExpressionEvalListener} that this {@link QueryEnvironment} uses. */
- QueryExpressionEvalListener<T> getEvalListener();
-
/** List of the default query functions. */
List<QueryFunction> DEFAULT_QUERY_FUNCTIONS =
ImmutableList.of(
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
index e35e9e4807..920722db4b 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
@@ -14,9 +14,8 @@
package com.google.devtools.build.lib.query2.engine;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.Collection;
-import java.util.concurrent.ForkJoinPool;
/**
* Base class for expressions in the Blaze query language, revision 2.
@@ -59,9 +58,9 @@ public abstract class QueryExpression {
protected QueryExpression() {}
/**
- * Evaluates this query in the specified environment, and notifies the callback with a result.
- * Note that it is allowed to notify the callback with partial results instead of just one final
- * result.
+ * Returns a {@link QueryTaskFuture} representing the asynchronous evaluation of this query in the
+ * specified environment, notifying the callback with a result. Note that it is allowed to notify
+ * the callback with partial results instead of just one final result.
*
* <p>Failures resulting from evaluation of an ill-formed query cause
* QueryException to be thrown.
@@ -71,45 +70,10 @@ public abstract class QueryExpression {
* thrown. If disabled, evaluation will stumble on to produce a (possibly
* inaccurate) result, but a result nonetheless.
*/
- public final <T> void eval(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- Callback<T> callback) throws QueryException, InterruptedException {
- env.getEvalListener().onEval(this, env, context, callback);
- evalImpl(env, context, callback);
- }
-
- protected abstract <T> void evalImpl(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- Callback<T> callback) throws QueryException, InterruptedException;
-
- /**
- * Evaluates this query in the specified environment, as in
- * {@link #eval(QueryEnvironment, VariableContext, Callback)}, using {@code forkJoinPool} to
- * achieve parallelism.
- *
- * <p>The caller must ensure that {@code env} is thread safe.
- */
- @ThreadSafe
- public final <T> void parEval(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool)
- throws QueryException, InterruptedException {
- env.getEvalListener().onParEval(this, env, context, callback, forkJoinPool);
- parEvalImpl(env, context, callback, forkJoinPool);
- }
-
- protected <T> void parEvalImpl(
+ public abstract <T> QueryTaskFuture<Void> eval(
QueryEnvironment<T> env,
VariableContext<T> context,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool)
- throws QueryException, InterruptedException {
- evalImpl(env, context, callback);
- }
+ Callback<T> callback);
/**
* Collects all target patterns that are referenced anywhere within this query expression and adds
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java
deleted file mode 100644
index e6bdaef7a9..0000000000
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java
+++ /dev/null
@@ -1,67 +0,0 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.query2.engine;
-
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import java.util.concurrent.ForkJoinPool;
-
-/** Listener for calls to the internal methods of {@link QueryExpression} used for evaluation. */
-@ThreadSafe
-public interface QueryExpressionEvalListener<T> {
- /** Called right before {@link QueryExpression#evalImpl} is called. */
- void onEval(
- QueryExpression expr,
- QueryEnvironment<T> env,
- VariableContext<T> context,
- Callback<T> callback);
-
- /** Called right before {@link QueryExpression#parEvalImpl} is called. */
- void onParEval(
- QueryExpression expr,
- QueryEnvironment<T> env,
- VariableContext<T> context,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool);
-
- /** A {@link QueryExpressionEvalListener} that does nothing. */
- class NullListener<T> implements QueryExpressionEvalListener<T> {
- private static final NullListener<?> INSTANCE = new NullListener<>();
-
- private NullListener() {
- }
-
- @SuppressWarnings("unchecked")
- public static <T> NullListener<T> instance() {
- return (NullListener<T>) INSTANCE;
- }
-
- @Override
- public void onEval(
- QueryExpression expr,
- QueryEnvironment<T> env,
- VariableContext<T> context,
- Callback<T> callback) {
- }
-
- @Override
- public void onParEval(
- QueryExpression expr,
- QueryEnvironment<T> env,
- VariableContext<T> context,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool) {
- }
- }
-}
-
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
index 2d7be2ed74..afb4192128 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
@@ -13,11 +13,13 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
-
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.MapMaker;
+import com.google.common.collect.Sets;
import com.google.devtools.build.lib.collect.CompactHashSet;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.Collections;
import java.util.Set;
@@ -28,17 +30,18 @@ public final class QueryUtil {
/** A {@link Callback} that can aggregate all the partial results into one set. */
public interface AggregateAllCallback<T> extends Callback<T> {
+ /** Returns a (mutable) set of all the results. */
Set<T> getResult();
}
- /** A {@link OutputFormatterCallback} that can aggregate all the partial results into one set. */
+ /** A {@link OutputFormatterCallback} that is also a {@link AggregateAllCallback}. */
public abstract static class AggregateAllOutputFormatterCallback<T>
- extends OutputFormatterCallback<T> implements AggregateAllCallback<T> {
+ extends ThreadSafeOutputFormatterCallback<T> implements AggregateAllCallback<T> {
}
private static class AggregateAllOutputFormatterCallbackImpl<T>
extends AggregateAllOutputFormatterCallback<T> {
- private final Set<T> result = CompactHashSet.create();
+ private final Set<T> result = Sets.newConcurrentHashSet();
@Override
public final void processOutput(Iterable<T> partialResult) {
@@ -51,65 +54,64 @@ public final class QueryUtil {
}
}
+ private static class OrderedAggregateAllOutputFormatterCallbackImpl<T>
+ extends AggregateAllOutputFormatterCallback<T> {
+ private final Set<T> result = CompactHashSet.create();
+
+ @Override
+ public final synchronized void processOutput(Iterable<T> partialResult) {
+ Iterables.addAll(result, partialResult);
+ }
+
+ @Override
+ public synchronized Set<T> getResult() {
+ return result;
+ }
+ }
+
/**
- * Returns a fresh {@link AggregateAllOutputFormatterCallback} that can aggregate all the partial
- * results into one set.
- *
- * <p>Intended to be used by top-level evaluation of {@link QueryExpression}s; contrast with
- * {@link #newAggregateAllCallback}.
+ * Returns a fresh {@link AggregateAllOutputFormatterCallback} instance whose
+ * {@link AggregateAllCallback#getResult} returns all the elements of the result in the order they
+ * were processed.
*/
public static <T> AggregateAllOutputFormatterCallback<T>
- newAggregateAllOutputFormatterCallback() {
- return new AggregateAllOutputFormatterCallbackImpl<>();
+ newOrderedAggregateAllOutputFormatterCallback() {
+ return new OrderedAggregateAllOutputFormatterCallbackImpl<>();
}
- /**
- * Returns a fresh {@link AggregateAllCallback}.
- *
- * <p>Intended to be used by {@link QueryExpression} implementations; contrast with
- * {@link #newAggregateAllOutputFormatterCallback}.
- */
+ /** Returns a fresh {@link AggregateAllCallback} instance. */
public static <T> AggregateAllCallback<T> newAggregateAllCallback() {
return new AggregateAllOutputFormatterCallbackImpl<>();
}
/**
- * Fully evaluate a {@code QueryExpression} and return a set with all the results.
+ * Returns a {@link QueryTaskFuture} representing the evaluation of {@code expr} as a (mutable)
+ * {@link Set} comprised of all the results.
*
* <p>Should only be used by QueryExpressions when it is the only way of achieving correctness.
*/
- public static <T> Set<T> evalAll(
- QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expr)
- throws QueryException, InterruptedException {
- AggregateAllCallback<T> callback = newAggregateAllCallback();
- env.eval(expr, context, callback);
- return callback.getResult();
+ public static <T> QueryTaskFuture<Set<T>> evalAll(
+ QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expr) {
+ final AggregateAllCallback<T> callback = newAggregateAllCallback();
+ return env.whenSucceedsCall(
+ env.eval(expr, context, callback),
+ new QueryTaskCallable<Set<T>>() {
+ @Override
+ public Set<T> call() {
+ return callback.getResult();
+ }
+ });
}
/** A trivial {@link Uniquifier} base class. */
- public abstract static class AbstractUniquifier<T, K>
- extends AbstractUniquifierBase<T, K> {
- private final CompactHashSet<K> alreadySeen = CompactHashSet.create();
+ public abstract static class AbstractUniquifier<T, K> implements Uniquifier<T> {
+ private final Set<K> alreadySeen;
- @Override
- public final boolean unique(T element) {
- return alreadySeen.add(extractKey(element));
+ protected AbstractUniquifier() {
+ this(/*concurrencyLevel=*/ 1);
}
- /**
- * Extracts an unique key that can be used to dedupe the given {@code element}.
- *
- * <p>Depending on the choice of {@code K}, this enables potential memory optimizations.
- */
- protected abstract K extractKey(T element);
- }
-
- /** A trivial {@link ThreadSafeUniquifier} base class. */
- public abstract static class AbstractThreadSafeUniquifier<T, K>
- extends AbstractUniquifierBase<T, K> implements ThreadSafeUniquifier<T> {
- private final Set<K> alreadySeen;
-
- protected AbstractThreadSafeUniquifier(int concurrencyLevel) {
+ protected AbstractUniquifier(int concurrencyLevel) {
this.alreadySeen = Collections.newSetFromMap(
new MapMaker().concurrencyLevel(concurrencyLevel).<K, Boolean>makeMap());
}
@@ -119,15 +121,6 @@ public final class QueryUtil {
return alreadySeen.add(extractKey(element));
}
- /**
- * Extracts an unique key that can be used to dedupe the given {@code element}.
- *
- * <p>Depending on the choice of {@code K}, this enables potential memory optimizations.
- */
- protected abstract K extractKey(T element);
- }
-
- private abstract static class AbstractUniquifierBase<T, K> implements Uniquifier<T> {
@Override
public final ImmutableList<T> unique(Iterable<T> newElements) {
ImmutableList.Builder<T> result = ImmutableList.builder();
@@ -138,5 +131,12 @@ public final class QueryUtil {
}
return result.build();
}
+
+ /**
+ * Extracts an unique key that can be used to dedupe the given {@code element}.
+ *
+ * <p>Depending on the choice of {@code K}, this enables potential memory optimizations.
+ */
+ protected abstract K extractKey(T element);
}
-}
+} \ No newline at end of file
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java
index 7d691c0b04..82faf72538 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java
@@ -13,12 +13,14 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.List;
import java.util.Set;
@@ -54,16 +56,31 @@ public final class RdepsFunction extends AllRdepsFunction {
* towards the universe while staying within the transitive closure.
*/
@Override
- public <T> void eval(QueryEnvironment<T> env,
- VariableContext<T> context,
- QueryExpression expression,
- List<Argument> args, Callback<T> callback)
- throws QueryException,
- InterruptedException {
- Set<T> universeValue = QueryUtil.evalAll(env, context, args.get(0).getExpression());
- env.buildTransitiveClosure(expression, universeValue, Integer.MAX_VALUE);
-
- Predicate<T> universe = Predicates.in(env.getTransitiveClosure(universeValue));
- eval(env, context, args.subList(1, args.size()), callback, universe);
+ public <T> QueryTaskFuture<Void> eval(
+ final QueryEnvironment<T> env,
+ final VariableContext<T> context,
+ final QueryExpression expression,
+ final List<Argument> args,
+ final Callback<T> callback) {
+ QueryTaskFuture<Set<T>> universeValueFuture =
+ QueryUtil.evalAll(env, context, args.get(0).getExpression());
+ Function<Set<T>, QueryTaskFuture<Void>> evalInUniverseAsyncFunction =
+ new Function<Set<T>, QueryTaskFuture<Void>>() {
+ @Override
+ public QueryTaskFuture<Void> apply(Set<T> universeValue) {
+ Predicate<T> universe;
+ try {
+ env.buildTransitiveClosure(expression, universeValue, Integer.MAX_VALUE);
+ universe = Predicates.in(env.getTransitiveClosure(universeValue));
+ } catch (InterruptedException e) {
+ return env.immediateCancelledFuture();
+ } catch (QueryException e) {
+ return env.immediateFailedFuture(e);
+ }
+ return RdepsFunction.this.eval(
+ env, context, args.subList(1, args.size()), callback, Optional.of(universe));
+ }
+ };
+ return env.transformAsync(universeValueFuture, evalInUniverseAsyncFunction);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java
index 9dc75a43e1..6b182ee7b1 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java
@@ -18,9 +18,8 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.List;
-import java.util.concurrent.ForkJoinPool;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -33,25 +32,24 @@ public abstract class RegexFilterExpression implements QueryFunction {
}
@Override
- public <T> void eval(
+ public <T> QueryTaskFuture<Void> eval(
final QueryEnvironment<T> env,
VariableContext<T> context,
QueryExpression expression,
final List<Argument> args,
- Callback<T> callback)
- throws QueryException, InterruptedException {
+ Callback<T> callback) {
String rawPattern = getPattern(args);
final Pattern compiledPattern;
try {
compiledPattern = Pattern.compile(rawPattern);
} catch (PatternSyntaxException e) {
- throw new QueryException(
+ return env.immediateFailedFuture(new QueryException(
expression,
String.format(
"illegal '%s' pattern regexp '%s': %s",
getName(),
rawPattern,
- e.getMessage()));
+ e.getMessage())));
}
// Note that Patttern#matcher is thread-safe and so this Predicate can safely be used
@@ -68,21 +66,10 @@ public abstract class RegexFilterExpression implements QueryFunction {
}
};
- env.eval(
+ return env.eval(
Iterables.getLast(args).getExpression(),
context,
- filteredCallback(callback, matchFilter));
- }
-
- @Override
- public <T> void parEval(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- QueryExpression expression,
- List<Argument> args,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
- eval(env, context, expression, args, callback);
+ new FilteredCallback<>(callback, matchFilter));
}
/**
@@ -111,21 +98,6 @@ public abstract class RegexFilterExpression implements QueryFunction {
protected abstract String getPattern(List<Argument> args);
- /**
- * Returns a new {@link Callback} that forwards values that satisfies the given {@link Predicate}
- * to the given {@code parentCallback}.
- *
- * <p>The returned {@link Callback} will be a {@link ThreadSafeCallback} iff
- * {@code parentCallback} is as well.
- */
- private static <T> Callback<T> filteredCallback(
- final Callback<T> parentCallback,
- final Predicate<T> retainIfTrue) {
- return (parentCallback instanceof ThreadSafeCallback)
- ? new ThreadSafeFilteredCallback<>((ThreadSafeCallback<T>) parentCallback, retainIfTrue)
- : new FilteredCallback<>(parentCallback, retainIfTrue);
- }
-
private static class FilteredCallback<T> implements Callback<T> {
private final Callback<T> parentCallback;
private final Predicate<T> retainIfTrue;
@@ -148,12 +120,4 @@ public abstract class RegexFilterExpression implements QueryFunction {
return "filtered parentCallback of : " + retainIfTrue;
}
}
-
- private static class ThreadSafeFilteredCallback<T>
- extends FilteredCallback<T> implements ThreadSafeCallback<T> {
- private ThreadSafeFilteredCallback(
- ThreadSafeCallback<T> parentCallback, Predicate<T> retainIfTrue) {
- super(parentCallback, retainIfTrue);
- }
- }
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java
index ac4b460f40..e1eadf3aa5 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java
@@ -14,7 +14,8 @@
package com.google.devtools.build.lib.query2.engine;
import com.google.common.base.Joiner;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -46,12 +47,13 @@ class SetExpression extends QueryExpression {
}
@Override
- protected <T> void evalImpl(
- QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
- throws QueryException, InterruptedException {
+ public <T> QueryTaskFuture<Void> eval(
+ QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) {
+ ArrayList<QueryTaskFuture<Void>> queryTasks = new ArrayList<>(words.size());
for (TargetLiteral expr : words) {
- env.eval(expr, context, callback);
+ queryTasks.add(env.eval(expr, context, callback));
}
+ return env.whenAllSucceed(queryTasks);
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java
index 8dc0442468..4b07a99f07 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java
@@ -19,8 +19,9 @@ import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.List;
-import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -49,36 +50,37 @@ class SomeFunction implements QueryFunction {
}
@Override
- public <T> void eval(
+ public <T> QueryTaskFuture<Void> eval(
QueryEnvironment<T> env,
VariableContext<T> context,
- QueryExpression expression,
+ final QueryExpression expression,
List<Argument> args,
- final Callback<T> callback) throws QueryException, InterruptedException {
+ final Callback<T> callback) {
final AtomicBoolean someFound = new AtomicBoolean(false);
- env.eval(args.get(0).getExpression(), context, new Callback<T>() {
- @Override
- public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
- if (someFound.get() || Iterables.isEmpty(partialResult)) {
- return;
- }
- callback.process(ImmutableSet.of(partialResult.iterator().next()));
- someFound.set(true);
- }
- });
- if (!someFound.get()) {
- throw new QueryException(expression, "argument set is empty");
- }
- }
-
- @Override
- public <T> void parEval(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- QueryExpression expression,
- List<Argument> args,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
- eval(env, context, expression, args, callback);
+ QueryTaskFuture<Void> operandEvalFuture = env.eval(
+ args.get(0).getExpression(),
+ context,
+ new Callback<T>() {
+ @Override
+ public void process(Iterable<T> partialResult)
+ throws QueryException, InterruptedException {
+ if (someFound.get() || Iterables.isEmpty(partialResult)) {
+ return;
+ }
+ callback.process(ImmutableSet.of(partialResult.iterator().next()));
+ someFound.set(true);
+ }
+ });
+ return env.whenSucceedsCall(
+ operandEvalFuture,
+ new QueryTaskCallable<Void>() {
+ @Override
+ public Void call() throws QueryException {
+ if (!someFound.get()) {
+ throw new QueryException(expression, "argument set is empty");
+ }
+ return null;
+ }
+ });
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java
index 2d0df0ef59..229863c79a 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java
@@ -20,10 +20,10 @@ import com.google.common.collect.Sets.SetView;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
/**
* A somepath(x, y) query expression, which computes the set of nodes
@@ -51,50 +51,52 @@ class SomePathFunction implements QueryFunction {
}
@Override
- public <T> void eval(
- QueryEnvironment<T> env,
+ public <T> QueryTaskFuture<Void> eval(
+ final QueryEnvironment<T> env,
VariableContext<T> context,
- QueryExpression expression,
+ final QueryExpression expression,
List<Argument> args,
- final Callback<T> callback) throws QueryException, InterruptedException {
- Set<T> fromValue = QueryUtil.evalAll(env, context, args.get(0).getExpression());
- Set<T> toValue = QueryUtil.evalAll(env, context, args.get(1).getExpression());
+ final Callback<T> callback) {
+ final QueryTaskFuture<Set<T>> fromValueFuture =
+ QueryUtil.evalAll(env, context, args.get(0).getExpression());
+ final QueryTaskFuture<Set<T>> toValueFuture =
+ QueryUtil.evalAll(env, context, args.get(1).getExpression());
- // Implementation strategy: for each x in "from", compute its forward
- // transitive closure. If it intersects "to", then do a path search from x
- // to an arbitrary node in the intersection, and return the path. This
- // avoids computing the full transitive closure of "from" in some cases.
+ return env.whenAllSucceedCall(
+ ImmutableList.of(fromValueFuture, toValueFuture),
+ new QueryTaskCallable<Void>() {
+ @Override
+ public Void call() throws QueryException, InterruptedException {
+ // Implementation strategy: for each x in "from", compute its forward
+ // transitive closure. If it intersects "to", then do a path search from x
+ // to an arbitrary node in the intersection, and return the path. This
+ // avoids computing the full transitive closure of "from" in some cases.
- env.buildTransitiveClosure(expression, fromValue, Integer.MAX_VALUE);
+ Set<T> fromValue = fromValueFuture.getIfSuccessful();
+ Set<T> toValue = toValueFuture.getIfSuccessful();
- // This set contains all nodes whose TC does not intersect "toValue".
- Uniquifier<T> uniquifier = env.createUniquifier();
+ env.buildTransitiveClosure(expression, fromValue, Integer.MAX_VALUE);
- for (T x : uniquifier.unique(fromValue)) {
- Set<T> xtc = env.getTransitiveClosure(ImmutableSet.of(x));
- SetView<T> result;
- if (xtc.size() > toValue.size()) {
- result = Sets.intersection(toValue, xtc);
- } else {
- result = Sets.intersection(xtc, toValue);
- }
- if (!result.isEmpty()) {
- callback.process(env.getNodesOnPath(x, result.iterator().next()));
- return;
- }
- uniquifier.unique(xtc);
- }
- callback.process(ImmutableSet.<T>of());
- }
+ // This set contains all nodes whose TC does not intersect "toValue".
+ Uniquifier<T> uniquifier = env.createUniquifier();
- @Override
- public <T> void parEval(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- QueryExpression expression,
- List<Argument> args,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
- eval(env, context, expression, args, callback);
+ for (T x : uniquifier.unique(fromValue)) {
+ Set<T> xtc = env.getTransitiveClosure(ImmutableSet.of(x));
+ SetView<T> result;
+ if (xtc.size() > toValue.size()) {
+ result = Sets.intersection(toValue, xtc);
+ } else {
+ result = Sets.intersection(xtc, toValue);
+ }
+ if (!result.isEmpty()) {
+ callback.process(env.getNodesOnPath(x, result.iterator().next()));
+ return null;
+ }
+ uniquifier.unique(xtc);
+ }
+ callback.process(ImmutableSet.<T>of());
+ return null;
+ }
+ });
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
index eda505a7ce..bb67e93ad5 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
@@ -14,7 +14,6 @@
package com.google.devtools.build.lib.query2.engine;
import com.google.common.base.Predicate;
-import java.util.concurrent.ForkJoinPool;
/**
* The environment of a Blaze query which supports predefined streaming operations.
@@ -24,22 +23,19 @@ import java.util.concurrent.ForkJoinPool;
public interface StreamableQueryEnvironment<T> extends QueryEnvironment<T> {
/** Retrieve and process all reverse dependencies of given expression in a streaming manner. */
- void getAllRdeps(
+ QueryTaskFuture<Void> getAllRdeps(
QueryExpression expression,
Predicate<T> universe,
VariableContext<T> context,
Callback<T> callback,
- int depth)
- throws QueryException, InterruptedException;
+ int depth);
/**
* Similar to {@link #getAllRdeps} but finds all rdeps without a depth bound, making use of the
* provided {@code forkJoinPool}.
*/
- void getAllRdepsUnboundedParallel(
+ QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
QueryExpression expression,
VariableContext<T> context,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool)
- throws QueryException, InterruptedException;
+ Callback<T> callback);
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SynchronizedDelegatingOutputFormatterCallback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SynchronizedDelegatingOutputFormatterCallback.java
new file mode 100644
index 0000000000..68a79338b7
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SynchronizedDelegatingOutputFormatterCallback.java
@@ -0,0 +1,58 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.query2.engine;
+
+import java.io.IOException;
+import javax.annotation.Nullable;
+
+/**
+ * A {@link ThreadSafeOutputFormatterCallback} wrapper around a {@link OutputFormatterCallback}
+ * delegate.
+ */
+public final class SynchronizedDelegatingOutputFormatterCallback<T>
+ extends ThreadSafeOutputFormatterCallback<T> {
+ private final OutputFormatterCallback<T> delegate;
+
+ public SynchronizedDelegatingOutputFormatterCallback(OutputFormatterCallback<T> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public synchronized void start() throws IOException {
+ delegate.start();
+ }
+
+ @Override
+ public synchronized void close(boolean failFast) throws InterruptedException, IOException {
+ delegate.close(failFast);
+ }
+
+ @Override
+ public synchronized void process(Iterable<T> partialResult)
+ throws QueryException, InterruptedException {
+ delegate.process(partialResult);
+ }
+
+ @Override
+ public synchronized void processOutput(Iterable<T> partialResult)
+ throws IOException, InterruptedException {
+ delegate.processOutput(partialResult);
+ }
+
+ @Override
+ @Nullable
+ public IOException getIoException() {
+ return delegate.getIoException();
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java b/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java
index aeace9aa70..733bffb065 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java
@@ -13,11 +13,10 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import com.google.devtools.build.lib.util.Preconditions;
-
import java.util.Collection;
import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
/**
* A literal set of targets, using 'blaze build' syntax. Or, a reference to a
@@ -45,38 +44,31 @@ public final class TargetLiteral extends QueryExpression {
return LetExpression.isValidVarReference(pattern);
}
- private <T> void evalVarReference(VariableContext<T> context, Callback<T> callback)
- throws QueryException, InterruptedException {
+ private <T> QueryTaskFuture<Void> evalVarReference(
+ QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) {
String varName = LetExpression.getNameFromReference(pattern);
Set<T> value = context.get(varName);
if (value == null) {
- throw new QueryException(this, "undefined variable '" + varName + "'");
+ return env.immediateFailedFuture(
+ new QueryException(this, "undefined variable '" + varName + "'"));
}
- callback.process(value);
- }
-
- @Override
- protected <T> void evalImpl(
- QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
- throws QueryException, InterruptedException {
- if (isVariableReference()) {
- evalVarReference(context, callback);
- } else {
- env.getTargetsMatchingPattern(this, pattern, callback);
+ try {
+ callback.process(value);
+ return env.immediateSuccessfulFuture(null);
+ } catch (QueryException e) {
+ return env.immediateFailedFuture(e);
+ } catch (InterruptedException e) {
+ return env.immediateCancelledFuture();
}
}
@Override
- protected <T> void parEvalImpl(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool)
- throws QueryException, InterruptedException {
+ public <T> QueryTaskFuture<Void> eval(
+ QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) {
if (isVariableReference()) {
- evalVarReference(context, callback);
+ return evalVarReference(env, context, callback);
} else {
- env.getTargetsMatchingPatternPar(this, pattern, callback, forkJoinPool);
+ return env.getTargetsMatchingPattern(this, pattern, callback);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java
index 956e604a44..d9ed576a5d 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java
@@ -15,9 +15,11 @@ package com.google.devtools.build.lib.query2.engine;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting;
import java.util.ArrayList;
import java.util.Collection;
@@ -27,7 +29,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
/**
* A tests(x) filter expression, which returns all the tests in set x,
@@ -62,15 +63,15 @@ class TestsFunction implements QueryFunction {
}
@Override
- public <T> void eval(
+ public <T> QueryTaskFuture<Void> eval(
final QueryEnvironment<T> env,
VariableContext<T> context,
QueryExpression expression,
List<Argument> args,
- final Callback<T> callback) throws QueryException, InterruptedException {
+ final Callback<T> callback) {
final Closure<T> closure = new Closure<>(expression, env);
- env.eval(args.get(0).getExpression(), context, new Callback<T>() {
+ return env.eval(args.get(0).getExpression(), context, new Callback<T>() {
@Override
public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
for (T target : partialResult) {
@@ -86,17 +87,6 @@ class TestsFunction implements QueryFunction {
});
}
- @Override
- public <T> void parEval(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- QueryExpression expression,
- List<Argument> args,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
- eval(env, context, expression, args, callback);
- }
-
/**
* Decides whether to include a test in a test_suite or not.
* @param testTags Collection of all tags exhibited by a given test.
@@ -151,10 +141,8 @@ class TestsFunction implements QueryFunction {
}
}
- /**
- * A closure over the temporary state needed to compute the expression. This makes the evaluation
- * thread-safe, as long as instances of this class are used only within a single thread.
- */
+ /** A closure over the temporary state needed to compute the expression. */
+ @ThreadSafe
private static final class Closure<T> {
private final QueryExpression expression;
/** A dynamically-populated mapping from test_suite rules to their tests. */
@@ -177,7 +165,8 @@ class TestsFunction implements QueryFunction {
*
* @precondition env.getAccessor().isTestSuite(testSuite)
*/
- private Set<T> getTestsInSuite(T testSuite) throws QueryException, InterruptedException {
+ private synchronized Set<T> getTestsInSuite(T testSuite)
+ throws QueryException, InterruptedException {
Set<T> tests = testsInSuite.get(testSuite);
if (tests == null) {
tests = Sets.newHashSet();
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java
deleted file mode 100644
index 950335e38a..0000000000
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java
+++ /dev/null
@@ -1,23 +0,0 @@
-// Copyright 2014 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.query2.engine;
-
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
-
-/** Marker interface for a {@link Callback} that is {@link ThreadSafe}. */
-@ThreadSafe
-public interface ThreadSafeCallback<T>
- extends Callback<T>, ThreadSafeBatchCallback<T, QueryException> {
-}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeUniquifier.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeOutputFormatterCallback.java
index 747185582f..bc3eb59f84 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeUniquifier.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeOutputFormatterCallback.java
@@ -1,4 +1,4 @@
-// Copyright 2016 The Bazel Authors. All rights reserved.
+// Copyright 2017 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -15,8 +15,7 @@ package com.google.devtools.build.lib.query2.engine;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-/** Marker interface for a {@link ThreadSafe} {@link Uniquifier}. */
+/** A marker parent class for a {@link ThreadSafe} {@link OutputFormatterCallback}. */
@ThreadSafe
-public interface ThreadSafeUniquifier<T> extends Uniquifier<T> {
-}
-
+public abstract class ThreadSafeOutputFormatterCallback<T> extends OutputFormatterCallback<T> {
+} \ No newline at end of file
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java b/src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java
index ed2b2376fa..5f8faf56b1 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java
@@ -14,8 +14,10 @@
package com.google.devtools.build.lib.query2.engine;
import com.google.common.collect.ImmutableList;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
/** A helper for deduping values. */
+@ThreadSafe
public interface Uniquifier<T> {
/** Returns whether {@code newElement} has been seen before. */
boolean unique(T newElement);
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java
index 532f331378..b09910c715 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java
@@ -14,13 +14,14 @@
package com.google.devtools.build.lib.query2.engine;
+import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
/**
* A visible(x, y) query expression, which computes the subset of nodes in y
@@ -52,34 +53,32 @@ public class VisibleFunction implements QueryFunction {
}
@Override
- public <T> void eval(
+ public <T> QueryTaskFuture<Void> eval(
final QueryEnvironment<T> env,
- VariableContext<T> context,
+ final VariableContext<T> context,
QueryExpression expression,
- List<Argument> args,
- final Callback<T> callback) throws QueryException, InterruptedException {
- final Set<T> toSet = QueryUtil.evalAll(env, context, args.get(0).getExpression());
- env.eval(args.get(1).getExpression(), context, new Callback<T>() {
- @Override
- public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
- for (T t : partialResult) {
- if (visibleToAll(env, toSet, t)) {
- callback.process(ImmutableList.of(t));
+ final List<Argument> args,
+ final Callback<T> callback) {
+ final QueryTaskFuture<Set<T>> toSetFuture =
+ QueryUtil.evalAll(env, context, args.get(0).getExpression());
+ Function<Set<T>, QueryTaskFuture<Void>> computeVisibleNodesAsyncFunction =
+ new Function<Set<T>, QueryTaskFuture<Void>>() {
+ @Override
+ public QueryTaskFuture<Void> apply(final Set<T> toSet) {
+ return env.eval(args.get(1).getExpression(), context, new Callback<T>() {
+ @Override
+ public void process(Iterable<T> partialResult)
+ throws QueryException, InterruptedException {
+ for (T t : partialResult) {
+ if (visibleToAll(env, toSet, t)) {
+ callback.process(ImmutableList.of(t));
+ }
+ }
+ }
+ });
}
- }
- }
- });
- }
-
- @Override
- public <T> void parEval(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- QueryExpression expression,
- List<Argument> args,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
- eval(env, context, expression, args, callback);
+ };
+ return env.transformAsync(toSetFuture, computeVisibleNodesAsyncFunction);
}
/** Returns true if {@code target} is visible to all targets in {@code toSet}. */
diff --git a/src/main/java/com/google/devtools/build/lib/query2/output/OutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/output/OutputFormatter.java
index 18890c3eb6..5c314b6a5c 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/output/OutputFormatter.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/output/OutputFormatter.java
@@ -35,6 +35,8 @@ import com.google.devtools.build.lib.packages.Target;
import com.google.devtools.build.lib.packages.TriState;
import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment;
+import com.google.devtools.build.lib.query2.engine.SynchronizedDelegatingOutputFormatterCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
import com.google.devtools.build.lib.query2.output.QueryOptions.OrderOutput;
import com.google.devtools.build.lib.syntax.EvalUtils;
import com.google.devtools.build.lib.syntax.Printer;
@@ -178,15 +180,16 @@ public abstract class OutputFormatter implements Serializable {
void setOptions(QueryOptions options, AspectResolver aspectResolver);
/**
- * Returns a {@link OutputFormatterCallback} whose {@link OutputFormatterCallback#process}
- * outputs formatted {@link Target}s to the given {@code out}.
+ * Returns a {@link ThreadSafeOutputFormatterCallback} whose
+ * {@link OutputFormatterCallback#process} outputs formatted {@link Target}s to the given
+ * {@code out}.
*
* <p>Takes any options specified via the most recent call to {@link #setOptions} into
* consideration.
*
* <p>Intended to be use for streaming out during evaluation of a query.
*/
- OutputFormatterCallback<Target> createStreamCallback(
+ ThreadSafeOutputFormatterCallback<Target> createStreamCallback(
OutputStream out, QueryOptions options, QueryEnvironment<?> env);
/**
@@ -288,9 +291,10 @@ public abstract class OutputFormatter implements Serializable {
}
@Override
- public OutputFormatterCallback<Target> createStreamCallback(
+ public ThreadSafeOutputFormatterCallback<Target> createStreamCallback(
OutputStream out, QueryOptions options, QueryEnvironment<?> env) {
- return createPostFactoStreamCallback(out, options);
+ return new SynchronizedDelegatingOutputFormatterCallback<>(
+ createPostFactoStreamCallback(out, options));
}
}
@@ -345,9 +349,10 @@ public abstract class OutputFormatter implements Serializable {
}
@Override
- public OutputFormatterCallback<Target> createStreamCallback(
+ public ThreadSafeOutputFormatterCallback<Target> createStreamCallback(
OutputStream out, QueryOptions options, QueryEnvironment<?> env) {
- return createPostFactoStreamCallback(out, options);
+ return new SynchronizedDelegatingOutputFormatterCallback<>(
+ createPostFactoStreamCallback(out, options));
}
}
@@ -387,9 +392,10 @@ public abstract class OutputFormatter implements Serializable {
}
@Override
- public OutputFormatterCallback<Target> createStreamCallback(
+ public ThreadSafeOutputFormatterCallback<Target> createStreamCallback(
OutputStream out, QueryOptions options, QueryEnvironment<?> env) {
- return createPostFactoStreamCallback(out, options);
+ return new SynchronizedDelegatingOutputFormatterCallback<>(
+ createPostFactoStreamCallback(out, options));
}
}
@@ -478,9 +484,10 @@ public abstract class OutputFormatter implements Serializable {
}
@Override
- public OutputFormatterCallback<Target> createStreamCallback(
+ public ThreadSafeOutputFormatterCallback<Target> createStreamCallback(
OutputStream out, QueryOptions options, QueryEnvironment<?> env) {
- return createPostFactoStreamCallback(out, options);
+ return new SynchronizedDelegatingOutputFormatterCallback<>(
+ createPostFactoStreamCallback(out, options));
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/output/ProtoOutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/output/ProtoOutputFormatter.java
index 66e1182c74..f966afb88d 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/output/ProtoOutputFormatter.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/output/ProtoOutputFormatter.java
@@ -41,6 +41,8 @@ import com.google.devtools.build.lib.packages.Target;
import com.google.devtools.build.lib.query2.FakeSubincludeTarget;
import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment;
+import com.google.devtools.build.lib.query2.engine.SynchronizedDelegatingOutputFormatterCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
import com.google.devtools.build.lib.query2.output.AspectResolver.BuildFileDependencyMode;
import com.google.devtools.build.lib.query2.output.OutputFormatter.AbstractUnorderedFormatter;
import com.google.devtools.build.lib.query2.output.QueryOptions.OrderOutput;
@@ -50,7 +52,6 @@ import com.google.devtools.build.lib.query2.proto.proto2api.Build.QueryResult.Bu
import com.google.devtools.build.lib.query2.proto.proto2api.Build.SourceFile;
import com.google.devtools.build.lib.syntax.Environment;
import com.google.devtools.build.lib.syntax.Type;
-
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
@@ -130,9 +131,10 @@ public class ProtoOutputFormatter extends AbstractUnorderedFormatter {
}
@Override
- public OutputFormatterCallback<Target> createStreamCallback(
+ public ThreadSafeOutputFormatterCallback<Target> createStreamCallback(
OutputStream out, QueryOptions options, QueryEnvironment<?> env) {
- return createPostFactoStreamCallback(out, options);
+ return new SynchronizedDelegatingOutputFormatterCallback<>(
+ createPostFactoStreamCallback(out, options));
}
private static Iterable<Target> getSortedLabels(Digraph<Target> result) {
diff --git a/src/main/java/com/google/devtools/build/lib/query2/output/XmlOutputFormatter.java b/src/main/java/com/google/devtools/build/lib/query2/output/XmlOutputFormatter.java
index 193ef18cf7..6784ed3917 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/output/XmlOutputFormatter.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/output/XmlOutputFormatter.java
@@ -29,10 +29,11 @@ import com.google.devtools.build.lib.packages.Target;
import com.google.devtools.build.lib.query2.FakeSubincludeTarget;
import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment;
+import com.google.devtools.build.lib.query2.engine.SynchronizedDelegatingOutputFormatterCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
import com.google.devtools.build.lib.query2.output.AspectResolver.BuildFileDependencyMode;
import com.google.devtools.build.lib.query2.output.OutputFormatter.AbstractUnorderedFormatter;
import com.google.devtools.build.lib.syntax.Type;
-
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
@@ -62,9 +63,10 @@ class XmlOutputFormatter extends AbstractUnorderedFormatter {
}
@Override
- public OutputFormatterCallback<Target> createStreamCallback(
+ public ThreadSafeOutputFormatterCallback<Target> createStreamCallback(
OutputStream out, QueryOptions options, QueryEnvironment<?> env) {
- return createPostFactoStreamCallback(out, options);
+ return new SynchronizedDelegatingOutputFormatterCallback<>(
+ createPostFactoStreamCallback(out, options));
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java b/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java
index 589822b805..e01aa4fcbd 100644
--- a/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java
+++ b/src/main/java/com/google/devtools/build/lib/rules/genquery/GenQuery.java
@@ -55,7 +55,6 @@ import com.google.devtools.build.lib.query2.engine.DigraphQueryEvalResult;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting;
import com.google.devtools.build.lib.query2.engine.QueryException;
-import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
import com.google.devtools.build.lib.query2.engine.QueryUtil;
import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllOutputFormatterCallback;
import com.google.devtools.build.lib.query2.engine.SkyframeRestartQueryException;
@@ -284,7 +283,7 @@ public class GenQuery implements RuleConfiguredTargetFactory {
DigraphQueryEvalResult<Target> queryResult;
OutputFormatter formatter;
AggregateAllOutputFormatterCallback<Target> targets =
- QueryUtil.newAggregateAllOutputFormatterCallback();
+ QueryUtil.newOrderedAggregateAllOutputFormatterCallback();
try {
Set<Setting> settings = queryOptions.toSettings();
@@ -318,7 +317,6 @@ public class GenQuery implements RuleConfiguredTargetFactory {
getEventHandler(ruleContext),
settings,
ImmutableList.<QueryFunction>of(),
- QueryExpressionEvalListener.NullListener.<Target>instance(),
/*packagePath=*/null);
queryResult = (DigraphQueryEvalResult<Target>) queryEnvironment.evaluateQuery(query, targets);
} catch (SkyframeRestartQueryException e) {
diff --git a/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java b/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java
index c52a3fe0d9..118f529220 100644
--- a/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java
+++ b/src/main/java/com/google/devtools/build/lib/runtime/commands/QueryCommand.java
@@ -23,14 +23,13 @@ import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.packages.Target;
import com.google.devtools.build.lib.pkgcache.PackageCacheOptions;
import com.google.devtools.build.lib.query2.AbstractBlazeQueryEnvironment;
-import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting;
import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
-import com.google.devtools.build.lib.query2.engine.QueryExpressionEvalListener;
import com.google.devtools.build.lib.query2.engine.QueryUtil;
import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllOutputFormatterCallback;
+import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback;
import com.google.devtools.build.lib.query2.output.OutputFormatter;
import com.google.devtools.build.lib.query2.output.OutputFormatter.StreamedFormatter;
import com.google.devtools.build.lib.query2.output.QueryOptions;
@@ -150,7 +149,7 @@ public final class QueryCommand implements BlazeCommand {
expr = queryEnv.transformParsedQuery(expr);
OutputStream out = env.getReporter().getOutErr().getOutputStream();
- OutputFormatterCallback<Target> callback;
+ ThreadSafeOutputFormatterCallback<Target> callback;
if (streamResults) {
disableAnsiCharactersFiltering(env);
@@ -161,7 +160,7 @@ public final class QueryCommand implements BlazeCommand {
queryOptions.aspectDeps.createResolver(env.getPackageManager(), env.getReporter()));
callback = streamedFormatter.createStreamCallback(out, queryOptions, queryEnv);
} else {
- callback = QueryUtil.newAggregateAllOutputFormatterCallback();
+ callback = QueryUtil.newOrderedAggregateAllOutputFormatterCallback();
}
boolean catastrophe = true;
try {
@@ -207,8 +206,7 @@ public final class QueryCommand implements BlazeCommand {
// 3. Output results:
try {
- Set<Target> targets =
- ((AggregateAllOutputFormatterCallback<Target>) callback).getResult();
+ Set<Target> targets = ((AggregateAllOutputFormatterCallback<Target>) callback).getResult();
QueryOutputUtils.output(
queryOptions,
result,
@@ -277,7 +275,6 @@ public final class QueryCommand implements BlazeCommand {
env.getReporter(),
settings,
env.getRuntime().getQueryFunctions(),
- QueryExpressionEvalListener.NullListener.<Target>instance(),
env.getPackageManager().getPackagePath());
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java b/src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java
index df6351a513..3b4b768793 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/PrepareDepsOfPatternFunction.java
@@ -37,7 +37,6 @@ import com.google.devtools.build.lib.skyframe.EnvironmentBackedRecursivePackageP
import com.google.devtools.build.lib.util.BatchCallback;
import com.google.devtools.build.lib.util.BatchCallback.NullCallback;
import com.google.devtools.build.lib.util.Preconditions;
-import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.RootedPath;
@@ -47,7 +46,6 @@ import com.google.devtools.build.skyframe.SkyKey;
import com.google.devtools.build.skyframe.SkyValue;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
@@ -131,7 +129,7 @@ public class PrepareDepsOfPatternFunction implements SkyFunction {
* transitive dependencies. Its methods may throw {@link MissingDepException} if the package
* values this depends on haven't been calculated and added to its environment.
*/
- static class DepsOfPatternPreparer implements TargetPatternResolver<Void> {
+ static class DepsOfPatternPreparer extends TargetPatternResolver<Void> {
private final EnvironmentBackedRecursivePackageProvider packageProvider;
private final Environment env;
@@ -230,7 +228,8 @@ public class PrepareDepsOfPatternFunction implements SkyFunction {
String directory,
boolean rulesOnly,
ImmutableSet<PathFragment> excludedSubdirectories,
- BatchCallback<Void, E> callback, Class<E> exceptionClass)
+ BatchCallback<Void, E> callback,
+ Class<E> exceptionClass)
throws TargetParsingException, E, InterruptedException {
FilteringPolicy policy =
rulesOnly ? FilteringPolicies.RULES_ONLY : FilteringPolicies.NO_FILTER;
@@ -261,26 +260,5 @@ public class PrepareDepsOfPatternFunction implements SkyFunction {
}
}
}
-
- @Override
- public <E extends Exception> void findTargetsBeneathDirectoryPar(
- RepositoryName repository,
- String originalPattern,
- String directory,
- boolean rulesOnly,
- ImmutableSet<PathFragment> excludedSubdirectories,
- ThreadSafeBatchCallback<Void, E> callback,
- Class<E> exceptionClass,
- ForkJoinPool forkJoinPool)
- throws TargetParsingException, E, InterruptedException {
- findTargetsBeneathDirectory(
- repository,
- originalPattern,
- directory,
- rulesOnly,
- excludedSubdirectories,
- callback,
- exceptionClass);
- }
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
index 5c6bd42ee8..dc84f5b6a1 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java
@@ -22,6 +22,9 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
@@ -29,7 +32,6 @@ import com.google.devtools.build.lib.cmdline.RepositoryName;
import com.google.devtools.build.lib.cmdline.ResolvedTargets;
import com.google.devtools.build.lib.cmdline.TargetParsingException;
import com.google.devtools.build.lib.cmdline.TargetPatternResolver;
-import com.google.devtools.build.lib.concurrent.MoreFutures;
import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
import com.google.devtools.build.lib.events.Event;
@@ -51,9 +53,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -61,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
@ThreadCompatible
public class RecursivePackageProviderBackedTargetPatternResolver
- implements TargetPatternResolver<Target> {
+ extends TargetPatternResolver<Target> {
// TODO(janakr): Move this to a more generic place and unify with SkyQueryEnvironment's value?
private static final int MAX_PACKAGES_BULK_GET = 1000;
@@ -194,56 +193,64 @@ public class RecursivePackageProviderBackedTargetPatternResolver
BatchCallback<Target, E> callback,
Class<E> exceptionClass)
throws TargetParsingException, E, InterruptedException {
- findTargetsBeneathDirectoryParImpl(
- repository,
- originalPattern,
- directory,
- rulesOnly,
- excludedSubdirectories,
- new SynchronizedBatchCallback<Target, E>(callback),
- exceptionClass,
- MoreExecutors.newDirectExecutorService());
+ try {
+ findTargetsBeneathDirectoryAsyncImpl(
+ repository,
+ originalPattern,
+ directory,
+ rulesOnly,
+ excludedSubdirectories,
+ new SynchronizedBatchCallback<Target, E>(callback),
+ MoreExecutors.newDirectExecutorService()).get();
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ Throwables.propagateIfPossible(cause, TargetParsingException.class, exceptionClass);
+ throw new IllegalStateException(e.getCause());
+ }
}
@Override
- public <E extends Exception> void findTargetsBeneathDirectoryPar(
- final RepositoryName repository,
- final String originalPattern,
+ public <E extends Exception> ListenableFuture<Void> findTargetsBeneathDirectoryAsync(
+ RepositoryName repository,
+ String originalPattern,
String directory,
boolean rulesOnly,
ImmutableSet<PathFragment> excludedSubdirectories,
- final ThreadSafeBatchCallback<Target, E> callback,
+ ThreadSafeBatchCallback<Target, E> callback,
Class<E> exceptionClass,
- ForkJoinPool forkJoinPool)
- throws TargetParsingException, E, InterruptedException {
- findTargetsBeneathDirectoryParImpl(
+ ListeningExecutorService executor) {
+ return findTargetsBeneathDirectoryAsyncImpl(
repository,
originalPattern,
directory,
rulesOnly,
excludedSubdirectories,
- callback,
- exceptionClass,
- forkJoinPool);
+ new SynchronizedBatchCallback<Target, E>(callback),
+ executor);
}
- private <E extends Exception> void findTargetsBeneathDirectoryParImpl(
+ private <E extends Exception> ListenableFuture<Void> findTargetsBeneathDirectoryAsyncImpl(
final RepositoryName repository,
final String originalPattern,
String directory,
boolean rulesOnly,
ImmutableSet<PathFragment> excludedSubdirectories,
final ThreadSafeBatchCallback<Target, E> callback,
- Class<E> exceptionClass,
- ExecutorService executor)
- throws TargetParsingException, E, InterruptedException {
+ ListeningExecutorService executor) {
final FilteringPolicy actualPolicy = rulesOnly
? FilteringPolicies.and(FilteringPolicies.RULES_ONLY, policy)
: policy;
- PathFragment pathFragment = TargetPatternResolverUtil.getPathFragment(directory);
- Iterable<PathFragment> packagesUnderDirectory =
- recursivePackageProvider.getPackagesUnderDirectory(
- repository, pathFragment, excludedSubdirectories);
+ final PathFragment pathFragment;
+ Iterable<PathFragment> packagesUnderDirectory;
+ try {
+ pathFragment = TargetPatternResolverUtil.getPathFragment(directory);
+ packagesUnderDirectory = recursivePackageProvider.getPackagesUnderDirectory(
+ repository, pathFragment, excludedSubdirectories);
+ } catch (TargetParsingException e) {
+ return Futures.immediateFailedFuture(e);
+ } catch (InterruptedException e) {
+ return Futures.immediateCancelledFuture();
+ }
Iterable<PackageIdentifier> pkgIds = Iterables.transform(packagesUnderDirectory,
new Function<PathFragment, PackageIdentifier>() {
@@ -258,9 +265,9 @@ public class RecursivePackageProviderBackedTargetPatternResolver
// into batches.
List<List<PackageIdentifier>> partitions =
ImmutableList.copyOf(Iterables.partition(pkgIds, MAX_PACKAGES_BULK_GET));
- ArrayList<Future<Void>> tasks = new ArrayList<>(partitions.size());
+ ArrayList<ListenableFuture<Void>> futures = new ArrayList<>(partitions.size());
for (final Iterable<PackageIdentifier> pkgIdBatch : partitions) {
- tasks.add(executor.submit(new Callable<Void>() {
+ futures.add(executor.submit(new Callable<Void>() {
@Override
public Void call() throws E, TargetParsingException, InterruptedException {
ImmutableSet<PackageIdentifier> pkgIdBatchSet = ImmutableSet.copyOf(pkgIdBatch);
@@ -288,17 +295,15 @@ public class RecursivePackageProviderBackedTargetPatternResolver
}
}));
}
- try {
- MoreFutures.waitForAllInterruptiblyFailFast(tasks);
- } catch (ExecutionException e) {
- Throwables.propagateIfPossible(e.getCause(), exceptionClass);
- Throwables.propagateIfPossible(
- e.getCause(), TargetParsingException.class, InterruptedException.class);
- throw new IllegalStateException(e);
- }
- if (!foundTarget.get()) {
- throw new TargetParsingException("no targets found beneath '" + pathFragment + "'");
- }
+ return Futures.whenAllSucceed(futures).call(new Callable<Void>() {
+ @Override
+ public Void call() throws TargetParsingException {
+ if (!foundTarget.get()) {
+ throw new TargetParsingException("no targets found beneath '" + pathFragment + "'");
+ }
+ return null;
+ }
+ });
}
private static <T> int calculateSize(Iterable<ResolvedTargets<T>> resolvedTargets) {
@@ -308,5 +313,6 @@ public class RecursivePackageProviderBackedTargetPatternResolver
}
return size;
}
+
}
diff --git a/src/main/java/com/google/devtools/build/lib/util/BatchCallback.java b/src/main/java/com/google/devtools/build/lib/util/BatchCallback.java
index bc74b7a161..91b4be86ab 100644
--- a/src/main/java/com/google/devtools/build/lib/util/BatchCallback.java
+++ b/src/main/java/com/google/devtools/build/lib/util/BatchCallback.java
@@ -13,11 +13,14 @@
// limitations under the License.
package com.google.devtools.build.lib.util;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+
/**
* Callback to be invoked when part of a result has been computed. Allows a client interested in
* the result to process it as it is computed, for instance by streaming it, if it is too big to
* fit in memory.
*/
+@ThreadSafe
public interface BatchCallback<T, E extends Exception> {
/**
* Called when part of a result has been computed.