aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/query2/engine
diff options
context:
space:
mode:
authorGravatar Nathan Harmata <nharmata@google.com>2017-03-01 02:55:48 +0000
committerGravatar Yue Gan <yueg@google.com>2017-03-01 12:35:42 +0000
commitb869e584641521dc063f0589d7ce6a850ffc0b61 (patch)
tree2694c728b3955154587e1425642e77023c88fd30 /src/main/java/com/google/devtools/build/lib/query2/engine
parent81aca8a39dc8b44bba8637998627201240a9c48c (diff)
-- PiperOrigin-RevId: 148844518 MOS_MIGRATED_REVID=148844518
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/engine')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/AbstractQueryEnvironment.java194
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java73
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java45
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java202
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java24
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java5
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java19
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java19
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java20
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java25
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java21
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java2
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java188
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java207
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java48
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java67
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java108
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java41
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java50
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java12
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java58
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java82
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java12
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/SynchronizedDelegatingOutputFormatterCallback.java58
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java40
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java29
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java23
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeUniquifier.java (renamed from src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeOutputFormatterCallback.java)9
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java2
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java51
30 files changed, 883 insertions, 851 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AbstractQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AbstractQueryEnvironment.java
deleted file mode 100644
index 62fd91b56f..0000000000
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/AbstractQueryEnvironment.java
+++ /dev/null
@@ -1,194 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.query2.engine;
-
-import com.google.common.base.Function;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.AsyncFunction;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
-import com.google.devtools.build.lib.util.Preconditions;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * A partial implementation of {@link QueryEnvironment} that has trivial in-thread implementations
- * of all the {@link QueryTaskFuture}/{@link QueryTaskCallable} helper methods.
- */
-public abstract class AbstractQueryEnvironment<T> implements QueryEnvironment<T> {
- /** Concrete implementation of {@link QueryTaskFuture}. */
- protected static final class QueryTaskFutureImpl<T>
- extends QueryTaskFutureImplBase<T> implements ListenableFuture<T> {
- private final ListenableFuture<T> delegate;
-
- private QueryTaskFutureImpl(ListenableFuture<T> delegate) {
- this.delegate = delegate;
- }
-
- public static <R> QueryTaskFutureImpl<R> ofDelegate(ListenableFuture<R> delegate) {
- return (delegate instanceof QueryTaskFutureImpl)
- ? (QueryTaskFutureImpl<R>) delegate
- : new QueryTaskFutureImpl<>(delegate);
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return delegate.cancel(mayInterruptIfRunning);
- }
-
- @Override
- public boolean isCancelled() {
- return delegate.isCancelled();
- }
-
- @Override
- public boolean isDone() {
- return delegate.isDone();
- }
-
- @Override
- public T get() throws InterruptedException, ExecutionException {
- return delegate.get();
- }
-
- @Override
- public T get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- return delegate.get(timeout, unit);
- }
-
- @Override
- public void addListener(Runnable listener, Executor executor) {
- delegate.addListener(listener, executor);
- }
-
- @Override
- public T getIfSuccessful() {
- Preconditions.checkState(delegate.isDone());
- try {
- return delegate.get();
- } catch (CancellationException | InterruptedException | ExecutionException e) {
- throw new IllegalStateException(e);
- }
- }
-
- public T getChecked() throws InterruptedException, QueryException {
- try {
- return get();
- } catch (CancellationException e) {
- throw new InterruptedException();
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- Throwables.propagateIfPossible(cause, QueryException.class);
- Throwables.propagateIfPossible(cause, InterruptedException.class);
- throw new IllegalStateException(e.getCause());
- }
- }
- }
-
- @Override
- public <R> QueryTaskFuture<R> immediateSuccessfulFuture(R value) {
- return new QueryTaskFutureImpl<>(Futures.immediateFuture(value));
- }
-
- @Override
- public <R> QueryTaskFuture<R> immediateFailedFuture(QueryException e) {
- return new QueryTaskFutureImpl<>(Futures.<R>immediateFailedFuture(e));
- }
-
- @Override
- public <R> QueryTaskFuture<R> immediateCancelledFuture() {
- return new QueryTaskFutureImpl<>(Futures.<R>immediateCancelledFuture());
- }
-
- @Override
- public QueryTaskFuture<Void> eval(
- QueryExpression expr, VariableContext<T> context, Callback<T> callback) {
- return expr.eval(this, context, callback);
- }
-
- @Override
- public <R> QueryTaskFuture<R> executeAsync(QueryTaskCallable<R> callable) {
- try {
- return immediateSuccessfulFuture(callable.call());
- } catch (QueryException e) {
- return immediateFailedFuture(e);
- } catch (InterruptedException e) {
- return immediateCancelledFuture();
- }
- }
-
- @Override
- public <R> QueryTaskFuture<R> whenSucceedsCall(
- QueryTaskFuture<?> future, QueryTaskCallable<R> callable) {
- return whenAllSucceedCall(ImmutableList.of(future), callable);
- }
-
- private static class Dummy implements QueryTaskCallable<Void> {
- public static final Dummy INSTANCE = new Dummy();
-
- private Dummy() {}
-
- @Override
- public Void call() {
- return null;
- }
- }
-
- @Override
- public QueryTaskFuture<Void> whenAllSucceed(Iterable<? extends QueryTaskFuture<?>> futures) {
- return whenAllSucceedCall(futures, Dummy.INSTANCE);
- }
-
- @Override
- public <R> QueryTaskFuture<R> whenAllSucceedCall(
- Iterable<? extends QueryTaskFuture<?>> futures, QueryTaskCallable<R> callable) {
- return QueryTaskFutureImpl.ofDelegate(
- Futures.whenAllSucceed(cast(futures)).call(callable));
- }
-
- @Override
- public <T1, T2> QueryTaskFuture<T2> transformAsync(
- QueryTaskFuture<T1> future,
- final Function<T1, QueryTaskFuture<T2>> function) {
- return QueryTaskFutureImpl.ofDelegate(
- Futures.transformAsync(
- (QueryTaskFutureImpl<T1>) future,
- new AsyncFunction<T1, T2>() {
- @Override
- public ListenableFuture<T2> apply(T1 input) throws Exception {
- return (QueryTaskFutureImpl<T2>) function.apply(input);
- }
- }));
- }
-
- protected static Iterable<QueryTaskFutureImpl<?>> cast(
- Iterable<? extends QueryTaskFuture<?>> futures) {
- return Iterables.transform(
- futures,
- new Function<QueryTaskFuture<?>, QueryTaskFutureImpl<?>>() {
- @Override
- public QueryTaskFutureImpl<?> apply(QueryTaskFuture<?> future) {
- return (QueryTaskFutureImpl<?>) future;
- }
- });
- }
-}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java
index 81be4c8ca2..adc12d278f 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java
@@ -21,12 +21,12 @@ import com.google.common.collect.Sets;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
/**
* Implementation of the <code>allpaths()</code> function.
@@ -51,47 +51,46 @@ public class AllPathsFunction implements QueryFunction {
}
@Override
- public <T> QueryTaskFuture<Void> eval(
- final QueryEnvironment<T> env,
+ public <T> void eval(
+ QueryEnvironment<T> env,
VariableContext<T> context,
- final QueryExpression expression,
+ QueryExpression expression,
List<Argument> args,
- final Callback<T> callback) {
- final QueryTaskFuture<Set<T>> fromValueFuture =
- QueryUtil.evalAll(env, context, args.get(0).getExpression());
- final QueryTaskFuture<Set<T>> toValueFuture =
- QueryUtil.evalAll(env, context, args.get(1).getExpression());
+ Callback<T> callback) throws QueryException, InterruptedException {
+
+ Set<T> fromValue = QueryUtil.evalAll(env, context, args.get(0).getExpression());
+ Set<T> toValue = QueryUtil.evalAll(env, context, args.get(1).getExpression());
- return env.whenAllSucceedCall(
- ImmutableList.of(fromValueFuture, toValueFuture),
- new QueryTaskCallable<Void>() {
- @Override
- public Void call() throws QueryException, InterruptedException {
- // Algorithm: compute "reachableFromX", the forward transitive closure of
- // the "from" set, then find the intersection of "reachableFromX" with the
- // reverse transitive closure of the "to" set. The reverse transitive
- // closure and intersection operations are interleaved for efficiency.
- // "result" holds the intersection.
+ // Algorithm: compute "reachableFromX", the forward transitive closure of
+ // the "from" set, then find the intersection of "reachableFromX" with the
+ // reverse transitive closure of the "to" set. The reverse transitive
+ // closure and intersection operations are interleaved for efficiency.
+ // "result" holds the intersection.
- Set<T> fromValue = fromValueFuture.getIfSuccessful();
- Set<T> toValue = toValueFuture.getIfSuccessful();
+ env.buildTransitiveClosure(expression, fromValue, Integer.MAX_VALUE);
- env.buildTransitiveClosure(expression, fromValue, Integer.MAX_VALUE);
+ Set<T> reachableFromX = env.getTransitiveClosure(fromValue);
+ Predicate<T> reachable = Predicates.in(reachableFromX);
+ Uniquifier<T> uniquifier = env.createUniquifier();
+ Collection<T> result = uniquifier.unique(intersection(reachableFromX, toValue));
+ callback.process(result);
+ Collection<T> worklist = result;
+ while (!worklist.isEmpty()) {
+ Collection<T> reverseDeps = env.getReverseDeps(worklist);
+ worklist = uniquifier.unique(Iterables.filter(reverseDeps, reachable));
+ callback.process(worklist);
+ }
+ }
- Set<T> reachableFromX = env.getTransitiveClosure(fromValue);
- Predicate<T> reachable = Predicates.in(reachableFromX);
- Uniquifier<T> uniquifier = env.createUniquifier();
- Collection<T> result = uniquifier.unique(intersection(reachableFromX, toValue));
- callback.process(result);
- Collection<T> worklist = result;
- while (!worklist.isEmpty()) {
- Collection<T> reverseDeps = env.getReverseDeps(worklist);
- worklist = uniquifier.unique(Iterables.filter(reverseDeps, reachable));
- callback.process(worklist);
- }
- return null;
- }
- });
+ @Override
+ public <T> void parEval(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ QueryExpression expression,
+ List<Argument> args,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+ eval(env, context, expression, args, callback);
}
/**
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
index f800d804dc..518b67497b 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java
@@ -13,7 +13,6 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
-import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
@@ -21,9 +20,10 @@ import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ForkJoinPool;
/**
* An "allrdeps" query expression, which computes the reverse dependencies of the argument within
@@ -52,34 +52,30 @@ public class AllRdepsFunction implements QueryFunction {
}
@Override
- public <T> QueryTaskFuture<Void> eval(
+ public <T> void eval(
QueryEnvironment<T> env,
VariableContext<T> context,
QueryExpression expression,
List<Argument> args,
- Callback<T> callback) {
- return eval(env, context, args, callback, Optional.<Predicate<T>>absent());
+ Callback<T> callback) throws QueryException, InterruptedException {
+ eval(env, context, args, callback, Predicates.<T>alwaysTrue());
}
- protected <T> QueryTaskFuture<Void> eval(
+ protected <T> void eval(
final QueryEnvironment<T> env,
VariableContext<T> context,
final List<Argument> args,
final Callback<T> callback,
- Optional<Predicate<T>> universeMaybe) {
+ final Predicate<T> universe)
+ throws QueryException, InterruptedException {
+
final int depth = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE;
- final Predicate<T> universe = universeMaybe.isPresent()
- ? universeMaybe.get()
- : Predicates.<T>alwaysTrue();
if (env instanceof StreamableQueryEnvironment<?>) {
- StreamableQueryEnvironment<T> streamableEnv = ((StreamableQueryEnvironment<T>) env);
- return depth == Integer.MAX_VALUE && !universeMaybe.isPresent()
- ? streamableEnv.getAllRdepsUnboundedParallel(args.get(0).getExpression(), context, callback)
- : streamableEnv.getAllRdeps(
- args.get(0).getExpression(), universe, context, callback, depth);
+ ((StreamableQueryEnvironment<T>) env)
+ .getAllRdeps(args.get(0).getExpression(), universe, context, callback, depth);
} else {
final Uniquifier<T> uniquifier = env.createUniquifier();
- return env.eval(
+ env.eval(
args.get(0).getExpression(),
context,
new Callback<T>() {
@@ -107,4 +103,21 @@ public class AllRdepsFunction implements QueryFunction {
});
}
}
+
+ @Override
+ public <T> void parEval(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ QueryExpression expression,
+ List<Argument> args,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+ boolean unbounded = args.size() == 1;
+ if (unbounded && env instanceof StreamableQueryEnvironment<?>) {
+ ((StreamableQueryEnvironment<T>) env).getAllRdepsUnboundedParallel(
+ args.get(0).getExpression(), context, callback, forkJoinPool);
+ } else {
+ eval(env, context, expression, args, callback);
+ }
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
index f9d20dbb19..89374d0a94 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
@@ -13,16 +13,16 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
-import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+import com.google.devtools.build.lib.query2.engine.Lexer.TokenKind;
+import com.google.devtools.build.lib.query2.engine.ParallelQueryUtils.QueryTask;
import com.google.devtools.build.lib.util.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
/**
* A binary algebraic set operation.
@@ -56,84 +56,40 @@ public class BinaryOperatorExpression extends QueryExpression {
}
@Override
- public <T> QueryTaskFuture<Void> eval(
- QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) {
- switch (operator) {
- case PLUS:
- case UNION:
- return evalPlus(operands, env, context, callback);
- case MINUS:
- case EXCEPT:
- return evalMinus(operands, env, context, callback);
- case INTERSECT:
- case CARET:
- return evalIntersect(env, context, callback);
- default:
- throw new IllegalStateException(operator.toString());
- }
- }
+ protected <T> void evalImpl(
+ QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
+ throws QueryException, InterruptedException {
- /**
- * Evaluates an expression of the form "e1 + e2 + ... + eK" by evaluating all the subexpressions
- * separately.
- *
- * <p>N.B. {@code operands.size()} may be {@code 1}.
- */
- private static <T> QueryTaskFuture<Void> evalPlus(
- ImmutableList<QueryExpression> operands,
- QueryEnvironment<T> env,
- VariableContext<T> context,
- Callback<T> callback) {
- ArrayList<QueryTaskFuture<Void>> queryTasks = new ArrayList<>(operands.size());
- for (QueryExpression operand : operands) {
- queryTasks.add(env.eval(operand, context, callback));
+ if (operator == TokenKind.PLUS || operator == TokenKind.UNION) {
+ for (QueryExpression operand : operands) {
+ env.eval(operand, context, callback);
+ }
+ return;
}
- return env.whenAllSucceed(queryTasks);
- }
- /**
- * Evaluates an expression of the form "e1 - e2 - ... - eK" by noting its equivalence to
- * "e1 - (e2 + ... + eK)" and evaluating the subexpressions on the right-hand-side separately.
- */
- private static <T> QueryTaskFuture<Void> evalMinus(
- final ImmutableList<QueryExpression> operands,
- final QueryEnvironment<T> env,
- final VariableContext<T> context,
- final Callback<T> callback) {
- QueryTaskFuture<Set<T>> lhsValueFuture = QueryUtil.evalAll(env, context, operands.get(0));
- Function<Set<T>, QueryTaskFuture<Void>> substractAsyncFunction =
- new Function<Set<T>, QueryTaskFuture<Void>>() {
- @Override
- public QueryTaskFuture<Void> apply(Set<T> lhsValue) {
- final Set<T> threadSafeLhsValue = Sets.newConcurrentHashSet(lhsValue);
- Callback<T> subtractionCallback = new Callback<T>() {
- @Override
- public void process(Iterable<T> partialResult) {
- for (T target : partialResult) {
- threadSafeLhsValue.remove(target);
- }
- }
- };
- QueryTaskFuture<Void> rhsEvaluatedFuture = evalPlus(
- operands.subList(1, operands.size()), env, context, subtractionCallback);
- return env.whenSucceedsCall(
- rhsEvaluatedFuture,
- new QueryTaskCallable<Void>() {
+ // Once we have fully evaluated the left-hand side, we can stream-process the right-hand side
+ // for minus operations. Note that this is suboptimal if the left-hand side results are very
+ // large compared to the right-hand side. Which is the case is hard to know before evaluating.
+ // We could consider determining this dynamically, however, by evaluating both the left and
+ // right hand side partially until one side finishes sooner.
+ final Set<T> lhsValue = QueryUtil.evalAll(env, context, operands.get(0));
+ if (operator == TokenKind.EXCEPT || operator == TokenKind.MINUS) {
+ for (int i = 1; i < operands.size(); i++) {
+ env.eval(operands.get(i), context,
+ new Callback<T>() {
@Override
- public Void call() throws QueryException, InterruptedException {
- callback.process(threadSafeLhsValue);
- return null;
+ public void process(Iterable<T> partialResult)
+ throws QueryException, InterruptedException {
+ for (T target : partialResult) {
+ lhsValue.remove(target);
+ }
}
});
}
- };
- return env.transformAsync(lhsValueFuture, substractAsyncFunction);
- }
+ callback.process(lhsValue);
+ return;
+ }
- private <T> QueryTaskFuture<Void> evalIntersect(
- final QueryEnvironment<T> env,
- final VariableContext<T> context,
- final Callback<T> callback) {
// For each right-hand side operand, intersection cannot be performed in a streaming manner; the
// entire result of that operand is needed. So, in order to avoid pinning too much in memory at
// once, we process each right-hand side operand one at a time and throw away that operand's
@@ -141,39 +97,77 @@ public class BinaryOperatorExpression extends QueryExpression {
// TODO(bazel-team): Consider keeping just the name / label of the right-hand side results
// instead of the potentially heavy-weight instances of type T. This would let us process all
// right-hand side operands in parallel without worrying about memory usage.
- QueryTaskFuture<Set<T>> rollingResultFuture = QueryUtil.evalAll(env, context, operands.get(0));
+ Preconditions.checkState(operator == TokenKind.INTERSECT || operator == TokenKind.CARET,
+ operator);
for (int i = 1; i < operands.size(); i++) {
- final int index = i;
- Function<Set<T>, QueryTaskFuture<Set<T>>> evalOperandAndIntersectAsyncFunction =
- new Function<Set<T>, QueryTaskFuture<Set<T>>>() {
- @Override
- public QueryTaskFuture<Set<T>> apply(final Set<T> rollingResult) {
- final QueryTaskFuture<Set<T>> rhsOperandValueFuture =
- QueryUtil.evalAll(env, context, operands.get(index));
- return env.whenSucceedsCall(
- rhsOperandValueFuture,
- new QueryTaskCallable<Set<T>>() {
- @Override
- public Set<T> call() throws QueryException, InterruptedException {
- rollingResult.retainAll(rhsOperandValueFuture.getIfSuccessful());
- return rollingResult;
- }
- });
- }
- };
- rollingResultFuture =
- env.transformAsync(rollingResultFuture, evalOperandAndIntersectAsyncFunction);
+ lhsValue.retainAll(QueryUtil.evalAll(env, context, operands.get(i)));
}
- final QueryTaskFuture<Set<T>> resultFuture = rollingResultFuture;
- return env.whenSucceedsCall(
- resultFuture,
- new QueryTaskCallable<Void>() {
- @Override
- public Void call() throws QueryException, InterruptedException {
- callback.process(resultFuture.getIfSuccessful());
- return null;
- }
- });
+ callback.process(lhsValue);
+ }
+
+ @Override
+ protected <T> void parEvalImpl(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool)
+ throws QueryException, InterruptedException {
+ if (operator == TokenKind.PLUS || operator == TokenKind.UNION) {
+ parEvalPlus(operands, env, context, callback, forkJoinPool);
+ } else if (operator == TokenKind.EXCEPT || operator == TokenKind.MINUS) {
+ parEvalMinus(operands, env, context, callback, forkJoinPool);
+ } else {
+ evalImpl(env, context, callback);
+ }
+ }
+
+ /**
+ * Evaluates an expression of the form "e1 + e2 + ... + eK" by evaluating all the subexpressions
+ * in parallel.
+ */
+ private static <T> void parEvalPlus(
+ ImmutableList<QueryExpression> operands,
+ final QueryEnvironment<T> env,
+ final VariableContext<T> context,
+ final ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool)
+ throws QueryException, InterruptedException {
+ ArrayList<QueryTask> queryTasks = new ArrayList<>(operands.size());
+ for (final QueryExpression operand : operands) {
+ queryTasks.add(new QueryTask() {
+ @Override
+ public void execute() throws QueryException, InterruptedException {
+ env.eval(operand, context, callback);
+ }
+ });
+ }
+ ParallelQueryUtils.executeQueryTasksAndWaitInterruptiblyFailFast(queryTasks, forkJoinPool);
+ }
+
+ /**
+ * Evaluates an expression of the form "e1 - e2 - ... - eK" by noting its equivalence to
+ * "e1 - (e2 + ... + eK)" and evaluating the subexpressions on the right-hand-side in parallel.
+ */
+ private static <T> void parEvalMinus(
+ ImmutableList<QueryExpression> operands,
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool)
+ throws QueryException, InterruptedException {
+ final Set<T> lhsValue =
+ Sets.newConcurrentHashSet(QueryUtil.evalAll(env, context, operands.get(0)));
+ ThreadSafeCallback<T> subtractionCallback = new ThreadSafeCallback<T>() {
+ @Override
+ public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
+ for (T target : partialResult) {
+ lhsValue.remove(target);
+ }
+ }
+ };
+ parEvalPlus(
+ operands.subList(1, operands.size()), env, context, subtractionCallback, forkJoinPool);
+ callback.process(lhsValue);
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java
index cbc0ae8d38..d2a2eb0fa8 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java
@@ -19,9 +19,10 @@ import com.google.devtools.build.lib.collect.CompactHashSet;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
/**
* A buildfiles(x) query expression, which computes the set of BUILD files and
@@ -41,17 +42,18 @@ class BuildFilesFunction implements QueryFunction {
}
@Override
- public <T> QueryTaskFuture<Void> eval(
+ public <T> void eval(
final QueryEnvironment<T> env,
VariableContext<T> context,
final QueryExpression expression,
List<Argument> args,
- final Callback<T> callback) {
+ final Callback<T> callback)
+ throws QueryException, InterruptedException {
final Uniquifier<T> uniquifier = env.createUniquifier();
- return env.eval(
+ env.eval(
args.get(0).getExpression(),
context,
- new Callback<T>() {
+ new ThreadSafeCallback<T>() {
@Override
public void process(Iterable<T> partialResult)
throws QueryException, InterruptedException {
@@ -65,6 +67,18 @@ class BuildFilesFunction implements QueryFunction {
}
@Override
+ public <T> void parEval(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ QueryExpression expression,
+ List<Argument> args,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+ // 'eval' is written in such a way that it enables parallel evaluation of 'expression'.
+ eval(env, context, expression, args, callback);
+ }
+
+ @Override
public int getMandatoryArguments() {
return 1;
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java
index 51c51fa99b..0f4321145d 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/Callback.java
@@ -13,17 +13,14 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.util.BatchCallback;
-import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
/**
* Query callback to be called by a {@link QueryExpression} when it has part of the computation
* result. Assuming the {@code QueryEnvironment} supports it, it would allow the caller
* to stream the results.
*/
-@ThreadSafe
-public interface Callback<T> extends ThreadSafeBatchCallback<T, QueryException> {
+public interface Callback<T> extends BatchCallback<T, QueryException> {
/**
* According to the {@link BatchCallback} interface, repeated elements may be passed in here.
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java
index 7317a35028..5eca701702 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java
@@ -18,10 +18,10 @@ import com.google.common.collect.Sets;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
/**
* A "deps" query expression, which computes the dependencies of the argument. An optional
@@ -53,15 +53,15 @@ final class DepsFunction implements QueryFunction {
* Breadth-first search from the arguments.
*/
@Override
- public <T> QueryTaskFuture<Void> eval(
+ public <T> void eval(
final QueryEnvironment<T> env,
VariableContext<T> context,
final QueryExpression expression,
List<Argument> args,
- final Callback<T> callback) {
+ final Callback<T> callback) throws QueryException, InterruptedException {
final int depthBound = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE;
final Uniquifier<T> uniquifier = env.createUniquifier();
- return env.eval(args.get(0).getExpression(), context, new Callback<T>() {
+ env.eval(args.get(0).getExpression(), context, new Callback<T>() {
@Override
public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
Collection<T> current = Sets.newHashSet(partialResult);
@@ -83,4 +83,15 @@ final class DepsFunction implements QueryFunction {
}
});
}
+
+ @Override
+ public <T> void parEval(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ QueryExpression expression,
+ List<Argument> args,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+ eval(env, context, expression, args, callback);
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java
index 85cfe9fddb..a31196aba5 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/FunctionExpression.java
@@ -20,9 +20,10 @@ import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.ForkJoinPool;
/**
* A query expression for user-defined query functions.
@@ -45,9 +46,19 @@ public class FunctionExpression extends QueryExpression {
}
@Override
- public <T> QueryTaskFuture<Void> eval(
- QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) {
- return function.eval(env, context, this, args, callback);
+ protected <T> void evalImpl(
+ QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
+ throws QueryException, InterruptedException {
+ function.eval(env, context, this, args, callback);
+ }
+
+ @Override
+ protected <T> void parEvalImpl(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+ function.parEval(env, context, this, args, callback, forkJoinPool);
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java
index 1d68573578..4fa428adb5 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java
@@ -17,9 +17,9 @@ import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ForkJoinPool;
/**
* A label(attr_name, argument) expression, which computes the set of targets
@@ -52,15 +52,16 @@ class LabelsFunction implements QueryFunction {
}
@Override
- public <T> QueryTaskFuture<Void> eval(
+ public <T> void eval(
final QueryEnvironment<T> env,
VariableContext<T> context,
final QueryExpression expression,
final List<Argument> args,
- final Callback<T> callback) {
+ final Callback<T> callback)
+ throws QueryException, InterruptedException {
final String attrName = args.get(0).getWord();
final Uniquifier<T> uniquifier = env.createUniquifier();
- return env.eval(args.get(1).getExpression(), context, new Callback<T>() {
+ env.eval(args.get(1).getExpression(), context, new Callback<T>() {
@Override
public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
for (T input : partialResult) {
@@ -79,4 +80,15 @@ class LabelsFunction implements QueryFunction {
}
});
}
+
+ @Override
+ public <T> void parEval(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ QueryExpression expression,
+ List<Argument> args,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+ eval(env, context, expression, args, callback);
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java
index a7c3abeb62..64d94da19a 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LetExpression.java
@@ -13,8 +13,6 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
-import com.google.common.base.Function;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.Collection;
import java.util.Set;
import java.util.regex.Pattern;
@@ -66,24 +64,15 @@ class LetExpression extends QueryExpression {
}
@Override
- public <T> QueryTaskFuture<Void> eval(
- final QueryEnvironment<T> env,
- final VariableContext<T> context,
- final Callback<T> callback) {
+ protected <T> void evalImpl(
+ QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
+ throws QueryException, InterruptedException {
if (!NAME_PATTERN.matcher(varName).matches()) {
- return env.immediateFailedFuture(
- new QueryException(this, "invalid variable name '" + varName + "' in let expression"));
+ throw new QueryException(this, "invalid variable name '" + varName + "' in let expression");
}
- QueryTaskFuture<Set<T>> varValueFuture = QueryUtil.evalAll(env, context, varExpr);
- Function<Set<T>, QueryTaskFuture<Void>> evalBodyAsyncFunction =
- new Function<Set<T>, QueryTaskFuture<Void>>() {
- @Override
- public QueryTaskFuture<Void> apply(Set<T> varValue) {
- VariableContext<T> bodyContext = VariableContext.with(context, varName, varValue);
- return env.eval(bodyExpr, bodyContext, callback);
- }
- };
- return env.transformAsync(varValueFuture, evalBodyAsyncFunction);
+ Set<T> varValue = QueryUtil.evalAll(env, context, varExpr);
+ VariableContext<T> bodyContext = VariableContext.with(context, varName, varValue);
+ env.eval(bodyExpr, bodyContext, callback);
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java
index 80b912f6d7..311a6afff5 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java
@@ -16,9 +16,10 @@ package com.google.devtools.build.lib.query2.engine;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.collect.CompactHashSet;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
/**
* A loadfiles(x) query expression, which computes the set of .bzl files
@@ -37,14 +38,15 @@ class LoadFilesFunction implements QueryEnvironment.QueryFunction {
}
@Override
- public <T> QueryTaskFuture<Void> eval(
+ public <T> void eval(
final QueryEnvironment<T> env,
VariableContext<T> context,
final QueryExpression expression,
List<QueryEnvironment.Argument> args,
- final Callback<T> callback) {
+ final Callback<T> callback)
+ throws QueryException, InterruptedException {
final Uniquifier<T> uniquifier = env.createUniquifier();
- return env.eval(
+ env.eval(
args.get(0).getExpression(),
context,
new Callback<T>() {
@@ -65,6 +67,17 @@ class LoadFilesFunction implements QueryEnvironment.QueryFunction {
}
@Override
+ public <T> void parEval(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ QueryExpression expression,
+ List<Argument> args,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+ eval(env, context, expression, args, callback);
+ }
+
+ @Override
public int getMandatoryArguments() {
return 1;
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java
index 50708d6816..5d21c874be 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/OutputFormatterCallback.java
@@ -46,7 +46,7 @@ public abstract class OutputFormatterCallback<T> implements Callback<T> {
* disambiguate between real interruptions or IO Exceptions.
*/
@Override
- public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
+ public final void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
try {
processOutput(partialResult);
} catch (IOException e) {
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java
new file mode 100644
index 0000000000..6e22709deb
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java
@@ -0,0 +1,188 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.query2.engine;
+
+import com.google.common.collect.Iterables;
+import com.google.devtools.build.lib.concurrent.MoreFutures;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+import java.util.concurrent.Future;
+
+/** Several utilities to aid in writing {@link QueryExpression#parEvalImpl} implementations. */
+public class ParallelQueryUtils {
+ /**
+ * Encapsulation of a subtask of parallel evaluation of a {@link QueryExpression}. See
+ * {@link #executeQueryTasksAndWaitInterruptiblyFailFast}.
+ */
+ @ThreadSafe
+ public interface QueryTask {
+ void execute() throws QueryException, InterruptedException;
+ }
+
+ /**
+ * Executes the given {@link QueryTask}s using the given {@link ForkJoinPool} and interruptibly
+ * waits for their completion. Throws the first {@link QueryException} encountered during parallel
+ * execution or an {@link InterruptedException} if the calling thread is interrupted.
+ *
+ * <p>These "fail-fast" semantics are desirable to avoid doing unneeded work when evaluating
+ * multiple {@link QueryTask}s in parallel: if serial execution of the tasks would result in a
+ * {@link QueryException} then we want parallel execution to do so as well, but there's no need to
+ * continue waiting for completion of the tasks after at least one of them results in a
+ * {@link QueryException}.
+ */
+ public static void executeQueryTasksAndWaitInterruptiblyFailFast(
+ List<QueryTask> queryTasks,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+ int numTasks = queryTasks.size();
+ if (numTasks == 1) {
+ Iterables.getOnlyElement(queryTasks).execute();
+ return;
+ }
+ FailFastCountDownLatch failFastLatch = new FailFastCountDownLatch(numTasks);
+ ArrayList<QueryTaskForkJoinTask> forkJoinTasks = new ArrayList<>(numTasks);
+ for (QueryTask queryTask : queryTasks) {
+ QueryTaskForkJoinTask forkJoinTask = adaptAsForkJoinTask(queryTask, failFastLatch);
+ forkJoinTasks.add(forkJoinTask);
+ @SuppressWarnings("unused")
+ Future<?> possiblyIgnoredError = forkJoinPool.submit(forkJoinTask);
+ }
+ failFastLatch.await();
+ try {
+ MoreFutures.waitForAllInterruptiblyFailFast(forkJoinTasks);
+ } catch (ExecutionException e) {
+ throw rethrowCause(e);
+ }
+ }
+
+ private static QueryTaskForkJoinTask adaptAsForkJoinTask(
+ QueryTask queryTask,
+ FailFastCountDownLatch failFastLatch) {
+ return new QueryTaskForkJoinTask(queryTask, failFastLatch);
+ }
+
+ private static RuntimeException rethrowCause(ExecutionException e)
+ throws QueryException, InterruptedException {
+ Throwable cause = e.getCause();
+ if (cause instanceof ParallelRuntimeException) {
+ ((ParallelRuntimeException) cause).rethrow();
+ }
+ throw new IllegalStateException(e);
+ }
+
+ /**
+ * Wrapper around a {@link CountDownLatch} with initial count {@code n} that counts down once on
+ * "success" and {@code n} times on "failure".
+ *
+ * <p>This can be used in a concurrent context to wait until either {@code n} tasks are successful
+ * or at least one of them fails.
+ */
+ @ThreadSafe
+ private static class FailFastCountDownLatch {
+ private final int n;
+ private final CountDownLatch completionLatch;
+
+ private FailFastCountDownLatch(int n) {
+ this.n = n;
+ this.completionLatch = new CountDownLatch(n);
+ }
+
+ private void await() throws InterruptedException {
+ completionLatch.await();
+ }
+
+ private void countDown(boolean success) {
+ if (success) {
+ completionLatch.countDown();
+ } else {
+ for (int i = 0; i < n; i++) {
+ completionLatch.countDown();
+ }
+ }
+ }
+ }
+
+ // ForkJoinTask#adapt(Callable) wraps thrown checked exceptions as RuntimeExceptions. We avoid
+ // having to think about that messiness (which is inconsistent with other Future implementations)
+ // by having our own ForkJoinTask subclass and managing checked exceptions ourselves.
+ @ThreadSafe
+ private static class QueryTaskForkJoinTask extends ForkJoinTask<Void> {
+ private final QueryTask queryTask;
+ private final FailFastCountDownLatch completionLatch;
+
+ private QueryTaskForkJoinTask(QueryTask queryTask, FailFastCountDownLatch completionLatch) {
+ this.queryTask = queryTask;
+ this.completionLatch = completionLatch;
+ }
+
+ @Override
+ public Void getRawResult() {
+ return null;
+ }
+
+ @Override
+ protected void setRawResult(Void value) {
+ }
+
+ @Override
+ protected boolean exec() {
+ boolean successful = false;
+ try {
+ queryTask.execute();
+ successful = true;
+ return true;
+ } catch (QueryException queryException) {
+ throw new ParallelRuntimeQueryException(queryException);
+ } catch (InterruptedException interruptedException) {
+ throw new ParallelInterruptedQueryException(interruptedException);
+ } finally {
+ completionLatch.countDown(successful);
+ }
+ }
+ }
+
+ private abstract static class ParallelRuntimeException extends RuntimeException {
+ abstract void rethrow() throws QueryException, InterruptedException;
+ }
+
+ private static class ParallelRuntimeQueryException extends ParallelRuntimeException {
+ private final QueryException queryException;
+
+ private ParallelRuntimeQueryException(QueryException queryException) {
+ this.queryException = queryException;
+ }
+
+ @Override
+ void rethrow() throws QueryException, InterruptedException {
+ throw queryException;
+ }
+ }
+
+ private static class ParallelInterruptedQueryException extends ParallelRuntimeException {
+ private final InterruptedException interruptedException;
+
+ private ParallelInterruptedQueryException(InterruptedException interruptedException) {
+ this.interruptedException = interruptedException;
+ }
+
+ @Override
+ void rethrow() throws QueryException, InterruptedException {
+ throw interruptedException;
+ }
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
index 0c11eec8f2..879c9c1ddb 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java
@@ -13,13 +13,11 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
-import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import java.util.Collection;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Callable;
+import java.util.concurrent.ForkJoinPool;
import javax.annotation.Nonnull;
/**
@@ -93,14 +91,16 @@ public interface QueryEnvironment<T> {
/** A user-defined query function. */
interface QueryFunction {
- /** Name of the function as it appears in the query language. */
+ /**
+ * Name of the function as it appears in the query language.
+ */
String getName();
/**
* The number of arguments that are required. The rest is optional.
*
- * <p>This should be greater than or equal to zero and at smaller than or equal to the length of
- * the list returned by {@link #getArgumentTypes}.
+ * <p>This should be greater than or equal to zero and at smaller than or equal to the length
+ * of the list returned by {@link #getArgumentTypes}.
*/
int getMandatoryArguments();
@@ -108,21 +108,34 @@ public interface QueryEnvironment<T> {
Iterable<ArgumentType> getArgumentTypes();
/**
- * Returns a {@link QueryTaskFuture} representing the asynchronous application of this
- * {@link QueryFunction} to the given {@code args}, feeding the results to the given
- * {@code callback}.
+ * Called when a user-defined function is to be evaluated.
*
* @param env the query environment this function is evaluated in.
* @param expression the expression being evaluated.
- * @param args the input arguments. These are type-checked against the specification returned by
- * {@link #getArgumentTypes} and {@link #getMandatoryArguments}
+ * @param args the input arguments. These are type-checked against the specification returned
+ * by {@link #getArgumentTypes} and {@link #getMandatoryArguments}
+ */
+ <T> void eval(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ QueryExpression expression,
+ List<Argument> args,
+ Callback<T> callback) throws QueryException, InterruptedException;
+
+ /**
+ * Same as {@link #eval(QueryEnvironment, VariableContext, QueryExpression, List, Callback)},
+ * except that this {@link QueryFunction} may use {@code forkJoinPool} to achieve
+ * parallelism.
+ *
+ * <p>The caller must ensure that {@code env} is thread safe.
*/
- <T> QueryTaskFuture<Void> eval(
+ <T> void parEval(
QueryEnvironment<T> env,
VariableContext<T> context,
QueryExpression expression,
List<Argument> args,
- Callback<T> callback);
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException;
}
/**
@@ -143,8 +156,18 @@ public interface QueryEnvironment<T> {
* Invokes {@code callback} with the set of target nodes in the graph for the specified target
* pattern, in 'blaze build' syntax.
*/
- QueryTaskFuture<Void> getTargetsMatchingPattern(
- QueryExpression owner, String pattern, Callback<T> callback);
+ void getTargetsMatchingPattern(QueryExpression owner, String pattern, Callback<T> callback)
+ throws QueryException, InterruptedException;
+
+ /**
+ * Same as {@link #getTargetsMatchingPattern}, but optionally making use of the given
+ * {@link ForkJoinPool} to achieve parallelism.
+ */
+ void getTargetsMatchingPatternPar(
+ QueryExpression owner,
+ String pattern,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException;
/** Ensures the specified target exists. */
// NOTE(bazel-team): this method is left here as scaffolding from a previous refactoring. It may
@@ -180,159 +203,14 @@ public interface QueryEnvironment<T> {
Set<T> getNodesOnPath(T from, T to) throws InterruptedException;
/**
- * Returns a {@link QueryTaskFuture} representing the asynchronous evaluation of the given
- * {@code expr} and passing of the results to the given {@code callback}.
+ * Eval an expression {@code expr} and pass the results to the {@code callback}.
*
* <p>Note that this method should guarantee that the callback does not see repeated elements.
- *
* @param expr The expression to evaluate
* @param callback The caller callback to notify when results are available
*/
- QueryTaskFuture<Void> eval(
- QueryExpression expr, VariableContext<T> context, Callback<T> callback);
-
- /**
- * An asynchronous computation of part of a query evaluation.
- *
- * <p>A {@link QueryTaskFuture} can only be produced from scratch via {@link #eval},
- * {@link #executeAsync}, {@link #immediateSuccessfulFuture}, {@link #immediateFailedFuture}, and
- * {@link #immediateCancelledFuture}.
- *
- * <p>Combined with the helper methods like {@link #whenSucceedsCall} below, this is very similar
- * to Guava's {@link ListenableFuture}.
- *
- * <p>This class is deliberately opaque; the only ways to compose/use {@link #QueryTaskFuture}
- * instances are the helper methods like {@link #whenSucceedsCall} below. A crucial consequence of
- * this is there is no way for a {@link QueryExpression} or {@link QueryFunction} implementation
- * to block on the result of a {@link #QueryTaskFuture}. This eliminates a large class of
- * deadlocks by design!
- */
- @ThreadSafe
- public abstract class QueryTaskFuture<T> {
- // We use a public abstract class with a private constructor so that this type is visible to all
- // the query codebase, but yet the only possible implementation is under our control in this
- // file.
- private QueryTaskFuture() {}
-
- /**
- * If this {@link QueryTasksFuture}'s encapsulated computation is currently complete and
- * successful, returns the result. This method is intended to be used in combination with
- * {@link #whenSucceedsCall}.
- *
- * <p>See the javadoc for the various helper methods that produce {@link QueryTasksFuture} for
- * the precise definition of "successful".
- */
- public abstract T getIfSuccessful();
- }
-
- /**
- * Returns a {@link QueryTaskFuture} representing the successful computation of {@code value}.
- *
- * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
- * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
- * {@link QueryTaskFuture#getIfSuccessful}.
- */
- abstract <R> QueryTaskFuture<R> immediateSuccessfulFuture(R value);
-
- /**
- * Returns a {@link QueryTaskFuture} representing a computation that was unsuccessful because of
- * {@code e}.
- *
- * <p>The returned {@link QueryTaskFuture} is considered "unsuccessful" for purposes of
- * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
- * {@link QueryTaskFuture#getIfSuccessful}.
- */
- abstract <R> QueryTaskFuture<R> immediateFailedFuture(QueryException e);
-
- /**
- * Returns a {@link QueryTaskFuture} representing a cancelled computation.
- *
- * <p>The returned {@link QueryTaskFuture} is considered "unsuccessful" for purposes of
- * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
- * {@link QueryTaskFuture#getIfSuccessful}.
- */
- abstract <R> QueryTaskFuture<R> immediateCancelledFuture();
-
- /** A {@link ThreadSafe} {@link Callable} for computations during query evaluation. */
- @ThreadSafe
- public interface QueryTaskCallable<T> extends Callable<T> {
- /**
- * Returns the computed value or throws a {@link QueryException} on failure or a
- * {@link InterruptedException} on interruption.
- */
- @Override
- T call() throws QueryException, InterruptedException;
- }
-
- /**
- * Returns a {@link QueryTaskFuture} representing the given computation {@code callable} being
- * performed asynchronously.
- *
- * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
- * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
- * {@link QueryTaskFuture#getIfSuccessful} iff {@code callable#call} does not throw an exception.
- */
- <R> QueryTaskFuture<R> executeAsync(QueryTaskCallable<R> callable);
-
- /**
- * Returns a {@link QueryTaskFuture} representing the given computation {@code callable} being
- * performed after the successful completion of the computation encapsulated by the given
- * {@code future} has completed successfully.
- *
- * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
- * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
- * {@link QueryTaskFuture#getIfSuccessful} iff {@code future} is successful and
- * {@code callable#call} does not throw an exception.
- */
- <R> QueryTaskFuture<R> whenSucceedsCall(QueryTaskFuture<?> future, QueryTaskCallable<R> callable);
-
- /**
- * Returns a {@link QueryTaskFuture} representing the successful completion of all the
- * computations encapsulated by the given {@code futures}.
- *
- * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
- * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
- * {@link QueryTaskFuture#getIfSuccessful} iff all of the given computations are "successful".
- */
- QueryTaskFuture<Void> whenAllSucceed(Iterable<? extends QueryTaskFuture<?>> futures);
-
- /**
- * Returns a {@link QueryTaskFuture} representing the given computation {@code callable} being
- * performed after the successful completion of all the computations encapsulated by the given
- * {@code futures}.
- *
- * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
- * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
- * {@link QueryTaskFuture#getIfSuccessful} iff all of the given computations are "successful" and
- * {@code callable#call} does not throw an exception.
- */
- <R> QueryTaskFuture<R> whenAllSucceedCall(
- Iterable<? extends QueryTaskFuture<?>> futures, QueryTaskCallable<R> callable);
-
- /**
- * Returns a {@link QueryTaskFuture} representing the asynchronous application of the given
- * {@code function} to the value produced by the computation encapsulated by the given
- * {@code future}.
- *
- * <p>The returned {@link QueryTaskFuture} is considered "successful" for purposes of
- * {@link #whenSucceedsCall}, {@link #whenAllSucceed}, and
- * {@link QueryTaskFuture#getIfSuccessful} iff {@code} future is "successful".
- */
- <T1, T2> QueryTaskFuture<T2> transformAsync(
- QueryTaskFuture<T1> future, Function<T1, QueryTaskFuture<T2>> function);
-
- /**
- * The sole package-protected subclass of {@link QueryTaskFuture}.
- *
- * <p>Do not subclass this class; it's an implementation detail. {@link QueryExpression} and
- * {@link QueryFunction} implementations should use {@link #eval} and {@link #executeAsync} to get
- * access to {@link QueryTaskFuture} instances and the then use the helper methods like
- * {@link #whenSucceedsCall} to transform them.
- */
- abstract class QueryTaskFutureImplBase<T> extends QueryTaskFuture<T> {
- protected QueryTaskFutureImplBase() {
- }
- }
+ void eval(QueryExpression expr, VariableContext<T> context, Callback<T> callback)
+ throws QueryException, InterruptedException;
/**
* Creates a Uniquifier for use in a {@code QueryExpression}. Note that the usage of this an
@@ -494,6 +372,9 @@ public interface QueryEnvironment<T> {
Set<QueryVisibility<T>> getVisibility(T from) throws QueryException, InterruptedException;
}
+ /** Returns the {@link QueryExpressionEvalListener} that this {@link QueryEnvironment} uses. */
+ QueryExpressionEvalListener<T> getEvalListener();
+
/** List of the default query functions. */
List<QueryFunction> DEFAULT_QUERY_FUNCTIONS =
ImmutableList.of(
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
index 920722db4b..e35e9e4807 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
@@ -14,8 +14,9 @@
package com.google.devtools.build.lib.query2.engine;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+
import java.util.Collection;
+import java.util.concurrent.ForkJoinPool;
/**
* Base class for expressions in the Blaze query language, revision 2.
@@ -58,9 +59,9 @@ public abstract class QueryExpression {
protected QueryExpression() {}
/**
- * Returns a {@link QueryTaskFuture} representing the asynchronous evaluation of this query in the
- * specified environment, notifying the callback with a result. Note that it is allowed to notify
- * the callback with partial results instead of just one final result.
+ * Evaluates this query in the specified environment, and notifies the callback with a result.
+ * Note that it is allowed to notify the callback with partial results instead of just one final
+ * result.
*
* <p>Failures resulting from evaluation of an ill-formed query cause
* QueryException to be thrown.
@@ -70,10 +71,45 @@ public abstract class QueryExpression {
* thrown. If disabled, evaluation will stumble on to produce a (possibly
* inaccurate) result, but a result nonetheless.
*/
- public abstract <T> QueryTaskFuture<Void> eval(
+ public final <T> void eval(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ Callback<T> callback) throws QueryException, InterruptedException {
+ env.getEvalListener().onEval(this, env, context, callback);
+ evalImpl(env, context, callback);
+ }
+
+ protected abstract <T> void evalImpl(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ Callback<T> callback) throws QueryException, InterruptedException;
+
+ /**
+ * Evaluates this query in the specified environment, as in
+ * {@link #eval(QueryEnvironment, VariableContext, Callback)}, using {@code forkJoinPool} to
+ * achieve parallelism.
+ *
+ * <p>The caller must ensure that {@code env} is thread safe.
+ */
+ @ThreadSafe
+ public final <T> void parEval(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool)
+ throws QueryException, InterruptedException {
+ env.getEvalListener().onParEval(this, env, context, callback, forkJoinPool);
+ parEvalImpl(env, context, callback, forkJoinPool);
+ }
+
+ protected <T> void parEvalImpl(
QueryEnvironment<T> env,
VariableContext<T> context,
- Callback<T> callback);
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool)
+ throws QueryException, InterruptedException {
+ evalImpl(env, context, callback);
+ }
/**
* Collects all target patterns that are referenced anywhere within this query expression and adds
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java
new file mode 100644
index 0000000000..e6bdaef7a9
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java
@@ -0,0 +1,67 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.query2.engine;
+
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+import java.util.concurrent.ForkJoinPool;
+
+/** Listener for calls to the internal methods of {@link QueryExpression} used for evaluation. */
+@ThreadSafe
+public interface QueryExpressionEvalListener<T> {
+ /** Called right before {@link QueryExpression#evalImpl} is called. */
+ void onEval(
+ QueryExpression expr,
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ Callback<T> callback);
+
+ /** Called right before {@link QueryExpression#parEvalImpl} is called. */
+ void onParEval(
+ QueryExpression expr,
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool);
+
+ /** A {@link QueryExpressionEvalListener} that does nothing. */
+ class NullListener<T> implements QueryExpressionEvalListener<T> {
+ private static final NullListener<?> INSTANCE = new NullListener<>();
+
+ private NullListener() {
+ }
+
+ @SuppressWarnings("unchecked")
+ public static <T> NullListener<T> instance() {
+ return (NullListener<T>) INSTANCE;
+ }
+
+ @Override
+ public void onEval(
+ QueryExpression expr,
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ Callback<T> callback) {
+ }
+
+ @Override
+ public void onParEval(
+ QueryExpression expr,
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool) {
+ }
+ }
+}
+
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
index afb4192128..2d7be2ed74 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
@@ -13,13 +13,11 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
+
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.MapMaker;
-import com.google.common.collect.Sets;
import com.google.devtools.build.lib.collect.CompactHashSet;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.Collections;
import java.util.Set;
@@ -30,18 +28,17 @@ public final class QueryUtil {
/** A {@link Callback} that can aggregate all the partial results into one set. */
public interface AggregateAllCallback<T> extends Callback<T> {
- /** Returns a (mutable) set of all the results. */
Set<T> getResult();
}
- /** A {@link OutputFormatterCallback} that is also a {@link AggregateAllCallback}. */
+ /** A {@link OutputFormatterCallback} that can aggregate all the partial results into one set. */
public abstract static class AggregateAllOutputFormatterCallback<T>
- extends ThreadSafeOutputFormatterCallback<T> implements AggregateAllCallback<T> {
+ extends OutputFormatterCallback<T> implements AggregateAllCallback<T> {
}
private static class AggregateAllOutputFormatterCallbackImpl<T>
extends AggregateAllOutputFormatterCallback<T> {
- private final Set<T> result = Sets.newConcurrentHashSet();
+ private final Set<T> result = CompactHashSet.create();
@Override
public final void processOutput(Iterable<T> partialResult) {
@@ -54,64 +51,65 @@ public final class QueryUtil {
}
}
- private static class OrderedAggregateAllOutputFormatterCallbackImpl<T>
- extends AggregateAllOutputFormatterCallback<T> {
- private final Set<T> result = CompactHashSet.create();
-
- @Override
- public final synchronized void processOutput(Iterable<T> partialResult) {
- Iterables.addAll(result, partialResult);
- }
-
- @Override
- public synchronized Set<T> getResult() {
- return result;
- }
- }
-
/**
- * Returns a fresh {@link AggregateAllOutputFormatterCallback} instance whose
- * {@link AggregateAllCallback#getResult} returns all the elements of the result in the order they
- * were processed.
+ * Returns a fresh {@link AggregateAllOutputFormatterCallback} that can aggregate all the partial
+ * results into one set.
+ *
+ * <p>Intended to be used by top-level evaluation of {@link QueryExpression}s; contrast with
+ * {@link #newAggregateAllCallback}.
*/
public static <T> AggregateAllOutputFormatterCallback<T>
- newOrderedAggregateAllOutputFormatterCallback() {
- return new OrderedAggregateAllOutputFormatterCallbackImpl<>();
+ newAggregateAllOutputFormatterCallback() {
+ return new AggregateAllOutputFormatterCallbackImpl<>();
}
- /** Returns a fresh {@link AggregateAllCallback} instance. */
+ /**
+ * Returns a fresh {@link AggregateAllCallback}.
+ *
+ * <p>Intended to be used by {@link QueryExpression} implementations; contrast with
+ * {@link #newAggregateAllOutputFormatterCallback}.
+ */
public static <T> AggregateAllCallback<T> newAggregateAllCallback() {
return new AggregateAllOutputFormatterCallbackImpl<>();
}
/**
- * Returns a {@link QueryTaskFuture} representing the evaluation of {@code expr} as a (mutable)
- * {@link Set} comprised of all the results.
+ * Fully evaluate a {@code QueryExpression} and return a set with all the results.
*
* <p>Should only be used by QueryExpressions when it is the only way of achieving correctness.
*/
- public static <T> QueryTaskFuture<Set<T>> evalAll(
- QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expr) {
- final AggregateAllCallback<T> callback = newAggregateAllCallback();
- return env.whenSucceedsCall(
- env.eval(expr, context, callback),
- new QueryTaskCallable<Set<T>>() {
- @Override
- public Set<T> call() {
- return callback.getResult();
- }
- });
+ public static <T> Set<T> evalAll(
+ QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expr)
+ throws QueryException, InterruptedException {
+ AggregateAllCallback<T> callback = newAggregateAllCallback();
+ env.eval(expr, context, callback);
+ return callback.getResult();
}
/** A trivial {@link Uniquifier} base class. */
- public abstract static class AbstractUniquifier<T, K> implements Uniquifier<T> {
- private final Set<K> alreadySeen;
+ public abstract static class AbstractUniquifier<T, K>
+ extends AbstractUniquifierBase<T, K> {
+ private final CompactHashSet<K> alreadySeen = CompactHashSet.create();
- protected AbstractUniquifier() {
- this(/*concurrencyLevel=*/ 1);
+ @Override
+ public final boolean unique(T element) {
+ return alreadySeen.add(extractKey(element));
}
- protected AbstractUniquifier(int concurrencyLevel) {
+ /**
+ * Extracts an unique key that can be used to dedupe the given {@code element}.
+ *
+ * <p>Depending on the choice of {@code K}, this enables potential memory optimizations.
+ */
+ protected abstract K extractKey(T element);
+ }
+
+ /** A trivial {@link ThreadSafeUniquifier} base class. */
+ public abstract static class AbstractThreadSafeUniquifier<T, K>
+ extends AbstractUniquifierBase<T, K> implements ThreadSafeUniquifier<T> {
+ private final Set<K> alreadySeen;
+
+ protected AbstractThreadSafeUniquifier(int concurrencyLevel) {
this.alreadySeen = Collections.newSetFromMap(
new MapMaker().concurrencyLevel(concurrencyLevel).<K, Boolean>makeMap());
}
@@ -121,6 +119,15 @@ public final class QueryUtil {
return alreadySeen.add(extractKey(element));
}
+ /**
+ * Extracts an unique key that can be used to dedupe the given {@code element}.
+ *
+ * <p>Depending on the choice of {@code K}, this enables potential memory optimizations.
+ */
+ protected abstract K extractKey(T element);
+ }
+
+ private abstract static class AbstractUniquifierBase<T, K> implements Uniquifier<T> {
@Override
public final ImmutableList<T> unique(Iterable<T> newElements) {
ImmutableList.Builder<T> result = ImmutableList.builder();
@@ -131,12 +138,5 @@ public final class QueryUtil {
}
return result.build();
}
-
- /**
- * Extracts an unique key that can be used to dedupe the given {@code element}.
- *
- * <p>Depending on the choice of {@code K}, this enables potential memory optimizations.
- */
- protected abstract K extractKey(T element);
}
-} \ No newline at end of file
+}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java
index 82faf72538..7d691c0b04 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/RdepsFunction.java
@@ -13,14 +13,12 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
-import com.google.common.base.Function;
-import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+
import java.util.List;
import java.util.Set;
@@ -56,31 +54,16 @@ public final class RdepsFunction extends AllRdepsFunction {
* towards the universe while staying within the transitive closure.
*/
@Override
- public <T> QueryTaskFuture<Void> eval(
- final QueryEnvironment<T> env,
- final VariableContext<T> context,
- final QueryExpression expression,
- final List<Argument> args,
- final Callback<T> callback) {
- QueryTaskFuture<Set<T>> universeValueFuture =
- QueryUtil.evalAll(env, context, args.get(0).getExpression());
- Function<Set<T>, QueryTaskFuture<Void>> evalInUniverseAsyncFunction =
- new Function<Set<T>, QueryTaskFuture<Void>>() {
- @Override
- public QueryTaskFuture<Void> apply(Set<T> universeValue) {
- Predicate<T> universe;
- try {
- env.buildTransitiveClosure(expression, universeValue, Integer.MAX_VALUE);
- universe = Predicates.in(env.getTransitiveClosure(universeValue));
- } catch (InterruptedException e) {
- return env.immediateCancelledFuture();
- } catch (QueryException e) {
- return env.immediateFailedFuture(e);
- }
- return RdepsFunction.this.eval(
- env, context, args.subList(1, args.size()), callback, Optional.of(universe));
- }
- };
- return env.transformAsync(universeValueFuture, evalInUniverseAsyncFunction);
+ public <T> void eval(QueryEnvironment<T> env,
+ VariableContext<T> context,
+ QueryExpression expression,
+ List<Argument> args, Callback<T> callback)
+ throws QueryException,
+ InterruptedException {
+ Set<T> universeValue = QueryUtil.evalAll(env, context, args.get(0).getExpression());
+ env.buildTransitiveClosure(expression, universeValue, Integer.MAX_VALUE);
+
+ Predicate<T> universe = Predicates.in(env.getTransitiveClosure(universeValue));
+ eval(env, context, args.subList(1, args.size()), callback, universe);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java
index 6b182ee7b1..9dc75a43e1 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java
@@ -18,8 +18,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+
import java.util.List;
+import java.util.concurrent.ForkJoinPool;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -32,24 +33,25 @@ public abstract class RegexFilterExpression implements QueryFunction {
}
@Override
- public <T> QueryTaskFuture<Void> eval(
+ public <T> void eval(
final QueryEnvironment<T> env,
VariableContext<T> context,
QueryExpression expression,
final List<Argument> args,
- Callback<T> callback) {
+ Callback<T> callback)
+ throws QueryException, InterruptedException {
String rawPattern = getPattern(args);
final Pattern compiledPattern;
try {
compiledPattern = Pattern.compile(rawPattern);
} catch (PatternSyntaxException e) {
- return env.immediateFailedFuture(new QueryException(
+ throw new QueryException(
expression,
String.format(
"illegal '%s' pattern regexp '%s': %s",
getName(),
rawPattern,
- e.getMessage())));
+ e.getMessage()));
}
// Note that Patttern#matcher is thread-safe and so this Predicate can safely be used
@@ -66,10 +68,21 @@ public abstract class RegexFilterExpression implements QueryFunction {
}
};
- return env.eval(
+ env.eval(
Iterables.getLast(args).getExpression(),
context,
- new FilteredCallback<>(callback, matchFilter));
+ filteredCallback(callback, matchFilter));
+ }
+
+ @Override
+ public <T> void parEval(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ QueryExpression expression,
+ List<Argument> args,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+ eval(env, context, expression, args, callback);
}
/**
@@ -98,6 +111,21 @@ public abstract class RegexFilterExpression implements QueryFunction {
protected abstract String getPattern(List<Argument> args);
+ /**
+ * Returns a new {@link Callback} that forwards values that satisfies the given {@link Predicate}
+ * to the given {@code parentCallback}.
+ *
+ * <p>The returned {@link Callback} will be a {@link ThreadSafeCallback} iff
+ * {@code parentCallback} is as well.
+ */
+ private static <T> Callback<T> filteredCallback(
+ final Callback<T> parentCallback,
+ final Predicate<T> retainIfTrue) {
+ return (parentCallback instanceof ThreadSafeCallback)
+ ? new ThreadSafeFilteredCallback<>((ThreadSafeCallback<T>) parentCallback, retainIfTrue)
+ : new FilteredCallback<>(parentCallback, retainIfTrue);
+ }
+
private static class FilteredCallback<T> implements Callback<T> {
private final Callback<T> parentCallback;
private final Predicate<T> retainIfTrue;
@@ -120,4 +148,12 @@ public abstract class RegexFilterExpression implements QueryFunction {
return "filtered parentCallback of : " + retainIfTrue;
}
}
+
+ private static class ThreadSafeFilteredCallback<T>
+ extends FilteredCallback<T> implements ThreadSafeCallback<T> {
+ private ThreadSafeFilteredCallback(
+ ThreadSafeCallback<T> parentCallback, Predicate<T> retainIfTrue) {
+ super(parentCallback, retainIfTrue);
+ }
+ }
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java
index e1eadf3aa5..ac4b460f40 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SetExpression.java
@@ -14,8 +14,7 @@
package com.google.devtools.build.lib.query2.engine;
import com.google.common.base.Joiner;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
-import java.util.ArrayList;
+
import java.util.Collection;
import java.util.List;
@@ -47,13 +46,12 @@ class SetExpression extends QueryExpression {
}
@Override
- public <T> QueryTaskFuture<Void> eval(
- QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) {
- ArrayList<QueryTaskFuture<Void>> queryTasks = new ArrayList<>(words.size());
+ protected <T> void evalImpl(
+ QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
+ throws QueryException, InterruptedException {
for (TargetLiteral expr : words) {
- queryTasks.add(env.eval(expr, context, callback));
+ env.eval(expr, context, callback);
}
- return env.whenAllSucceed(queryTasks);
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java
index 4b07a99f07..8dc0442468 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java
@@ -19,9 +19,8 @@ import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.List;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -50,37 +49,36 @@ class SomeFunction implements QueryFunction {
}
@Override
- public <T> QueryTaskFuture<Void> eval(
+ public <T> void eval(
QueryEnvironment<T> env,
VariableContext<T> context,
- final QueryExpression expression,
+ QueryExpression expression,
List<Argument> args,
- final Callback<T> callback) {
+ final Callback<T> callback) throws QueryException, InterruptedException {
final AtomicBoolean someFound = new AtomicBoolean(false);
- QueryTaskFuture<Void> operandEvalFuture = env.eval(
- args.get(0).getExpression(),
- context,
- new Callback<T>() {
- @Override
- public void process(Iterable<T> partialResult)
- throws QueryException, InterruptedException {
- if (someFound.get() || Iterables.isEmpty(partialResult)) {
- return;
- }
- callback.process(ImmutableSet.of(partialResult.iterator().next()));
- someFound.set(true);
- }
- });
- return env.whenSucceedsCall(
- operandEvalFuture,
- new QueryTaskCallable<Void>() {
- @Override
- public Void call() throws QueryException {
- if (!someFound.get()) {
- throw new QueryException(expression, "argument set is empty");
- }
- return null;
- }
- });
+ env.eval(args.get(0).getExpression(), context, new Callback<T>() {
+ @Override
+ public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
+ if (someFound.get() || Iterables.isEmpty(partialResult)) {
+ return;
+ }
+ callback.process(ImmutableSet.of(partialResult.iterator().next()));
+ someFound.set(true);
+ }
+ });
+ if (!someFound.get()) {
+ throw new QueryException(expression, "argument set is empty");
+ }
+ }
+
+ @Override
+ public <T> void parEval(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ QueryExpression expression,
+ List<Argument> args,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+ eval(env, context, expression, args, callback);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java
index 229863c79a..2d0df0ef59 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java
@@ -20,10 +20,10 @@ import com.google.common.collect.Sets.SetView;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
+
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
/**
* A somepath(x, y) query expression, which computes the set of nodes
@@ -51,52 +51,50 @@ class SomePathFunction implements QueryFunction {
}
@Override
- public <T> QueryTaskFuture<Void> eval(
- final QueryEnvironment<T> env,
+ public <T> void eval(
+ QueryEnvironment<T> env,
VariableContext<T> context,
- final QueryExpression expression,
+ QueryExpression expression,
List<Argument> args,
- final Callback<T> callback) {
- final QueryTaskFuture<Set<T>> fromValueFuture =
- QueryUtil.evalAll(env, context, args.get(0).getExpression());
- final QueryTaskFuture<Set<T>> toValueFuture =
- QueryUtil.evalAll(env, context, args.get(1).getExpression());
+ final Callback<T> callback) throws QueryException, InterruptedException {
+ Set<T> fromValue = QueryUtil.evalAll(env, context, args.get(0).getExpression());
+ Set<T> toValue = QueryUtil.evalAll(env, context, args.get(1).getExpression());
- return env.whenAllSucceedCall(
- ImmutableList.of(fromValueFuture, toValueFuture),
- new QueryTaskCallable<Void>() {
- @Override
- public Void call() throws QueryException, InterruptedException {
- // Implementation strategy: for each x in "from", compute its forward
- // transitive closure. If it intersects "to", then do a path search from x
- // to an arbitrary node in the intersection, and return the path. This
- // avoids computing the full transitive closure of "from" in some cases.
+ // Implementation strategy: for each x in "from", compute its forward
+ // transitive closure. If it intersects "to", then do a path search from x
+ // to an arbitrary node in the intersection, and return the path. This
+ // avoids computing the full transitive closure of "from" in some cases.
- Set<T> fromValue = fromValueFuture.getIfSuccessful();
- Set<T> toValue = toValueFuture.getIfSuccessful();
+ env.buildTransitiveClosure(expression, fromValue, Integer.MAX_VALUE);
- env.buildTransitiveClosure(expression, fromValue, Integer.MAX_VALUE);
+ // This set contains all nodes whose TC does not intersect "toValue".
+ Uniquifier<T> uniquifier = env.createUniquifier();
- // This set contains all nodes whose TC does not intersect "toValue".
- Uniquifier<T> uniquifier = env.createUniquifier();
+ for (T x : uniquifier.unique(fromValue)) {
+ Set<T> xtc = env.getTransitiveClosure(ImmutableSet.of(x));
+ SetView<T> result;
+ if (xtc.size() > toValue.size()) {
+ result = Sets.intersection(toValue, xtc);
+ } else {
+ result = Sets.intersection(xtc, toValue);
+ }
+ if (!result.isEmpty()) {
+ callback.process(env.getNodesOnPath(x, result.iterator().next()));
+ return;
+ }
+ uniquifier.unique(xtc);
+ }
+ callback.process(ImmutableSet.<T>of());
+ }
- for (T x : uniquifier.unique(fromValue)) {
- Set<T> xtc = env.getTransitiveClosure(ImmutableSet.of(x));
- SetView<T> result;
- if (xtc.size() > toValue.size()) {
- result = Sets.intersection(toValue, xtc);
- } else {
- result = Sets.intersection(xtc, toValue);
- }
- if (!result.isEmpty()) {
- callback.process(env.getNodesOnPath(x, result.iterator().next()));
- return null;
- }
- uniquifier.unique(xtc);
- }
- callback.process(ImmutableSet.<T>of());
- return null;
- }
- });
+ @Override
+ public <T> void parEval(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ QueryExpression expression,
+ List<Argument> args,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+ eval(env, context, expression, args, callback);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
index bb67e93ad5..eda505a7ce 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java
@@ -14,6 +14,7 @@
package com.google.devtools.build.lib.query2.engine;
import com.google.common.base.Predicate;
+import java.util.concurrent.ForkJoinPool;
/**
* The environment of a Blaze query which supports predefined streaming operations.
@@ -23,19 +24,22 @@ import com.google.common.base.Predicate;
public interface StreamableQueryEnvironment<T> extends QueryEnvironment<T> {
/** Retrieve and process all reverse dependencies of given expression in a streaming manner. */
- QueryTaskFuture<Void> getAllRdeps(
+ void getAllRdeps(
QueryExpression expression,
Predicate<T> universe,
VariableContext<T> context,
Callback<T> callback,
- int depth);
+ int depth)
+ throws QueryException, InterruptedException;
/**
* Similar to {@link #getAllRdeps} but finds all rdeps without a depth bound, making use of the
* provided {@code forkJoinPool}.
*/
- QueryTaskFuture<Void> getAllRdepsUnboundedParallel(
+ void getAllRdepsUnboundedParallel(
QueryExpression expression,
VariableContext<T> context,
- Callback<T> callback);
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool)
+ throws QueryException, InterruptedException;
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SynchronizedDelegatingOutputFormatterCallback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SynchronizedDelegatingOutputFormatterCallback.java
deleted file mode 100644
index 68a79338b7..0000000000
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/SynchronizedDelegatingOutputFormatterCallback.java
+++ /dev/null
@@ -1,58 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.query2.engine;
-
-import java.io.IOException;
-import javax.annotation.Nullable;
-
-/**
- * A {@link ThreadSafeOutputFormatterCallback} wrapper around a {@link OutputFormatterCallback}
- * delegate.
- */
-public final class SynchronizedDelegatingOutputFormatterCallback<T>
- extends ThreadSafeOutputFormatterCallback<T> {
- private final OutputFormatterCallback<T> delegate;
-
- public SynchronizedDelegatingOutputFormatterCallback(OutputFormatterCallback<T> delegate) {
- this.delegate = delegate;
- }
-
- @Override
- public synchronized void start() throws IOException {
- delegate.start();
- }
-
- @Override
- public synchronized void close(boolean failFast) throws InterruptedException, IOException {
- delegate.close(failFast);
- }
-
- @Override
- public synchronized void process(Iterable<T> partialResult)
- throws QueryException, InterruptedException {
- delegate.process(partialResult);
- }
-
- @Override
- public synchronized void processOutput(Iterable<T> partialResult)
- throws IOException, InterruptedException {
- delegate.processOutput(partialResult);
- }
-
- @Override
- @Nullable
- public IOException getIoException() {
- return delegate.getIoException();
- }
-}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java b/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java
index 733bffb065..aeace9aa70 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/TargetLiteral.java
@@ -13,10 +13,11 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import com.google.devtools.build.lib.util.Preconditions;
+
import java.util.Collection;
import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
/**
* A literal set of targets, using 'blaze build' syntax. Or, a reference to a
@@ -44,31 +45,38 @@ public final class TargetLiteral extends QueryExpression {
return LetExpression.isValidVarReference(pattern);
}
- private <T> QueryTaskFuture<Void> evalVarReference(
- QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) {
+ private <T> void evalVarReference(VariableContext<T> context, Callback<T> callback)
+ throws QueryException, InterruptedException {
String varName = LetExpression.getNameFromReference(pattern);
Set<T> value = context.get(varName);
if (value == null) {
- return env.immediateFailedFuture(
- new QueryException(this, "undefined variable '" + varName + "'"));
+ throw new QueryException(this, "undefined variable '" + varName + "'");
}
- try {
- callback.process(value);
- return env.immediateSuccessfulFuture(null);
- } catch (QueryException e) {
- return env.immediateFailedFuture(e);
- } catch (InterruptedException e) {
- return env.immediateCancelledFuture();
+ callback.process(value);
+ }
+
+ @Override
+ protected <T> void evalImpl(
+ QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
+ throws QueryException, InterruptedException {
+ if (isVariableReference()) {
+ evalVarReference(context, callback);
+ } else {
+ env.getTargetsMatchingPattern(this, pattern, callback);
}
}
@Override
- public <T> QueryTaskFuture<Void> eval(
- QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) {
+ protected <T> void parEvalImpl(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool)
+ throws QueryException, InterruptedException {
if (isVariableReference()) {
- return evalVarReference(env, context, callback);
+ evalVarReference(context, callback);
} else {
- return env.getTargetsMatchingPattern(this, pattern, callback);
+ env.getTargetsMatchingPatternPar(this, pattern, callback, forkJoinPool);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java
index d9ed576a5d..956e604a44 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java
@@ -15,11 +15,9 @@ package com.google.devtools.build.lib.query2.engine;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Setting;
import java.util.ArrayList;
import java.util.Collection;
@@ -29,6 +27,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
/**
* A tests(x) filter expression, which returns all the tests in set x,
@@ -63,15 +62,15 @@ class TestsFunction implements QueryFunction {
}
@Override
- public <T> QueryTaskFuture<Void> eval(
+ public <T> void eval(
final QueryEnvironment<T> env,
VariableContext<T> context,
QueryExpression expression,
List<Argument> args,
- final Callback<T> callback) {
+ final Callback<T> callback) throws QueryException, InterruptedException {
final Closure<T> closure = new Closure<>(expression, env);
- return env.eval(args.get(0).getExpression(), context, new Callback<T>() {
+ env.eval(args.get(0).getExpression(), context, new Callback<T>() {
@Override
public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
for (T target : partialResult) {
@@ -87,6 +86,17 @@ class TestsFunction implements QueryFunction {
});
}
+ @Override
+ public <T> void parEval(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ QueryExpression expression,
+ List<Argument> args,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+ eval(env, context, expression, args, callback);
+ }
+
/**
* Decides whether to include a test in a test_suite or not.
* @param testTags Collection of all tags exhibited by a given test.
@@ -141,8 +151,10 @@ class TestsFunction implements QueryFunction {
}
}
- /** A closure over the temporary state needed to compute the expression. */
- @ThreadSafe
+ /**
+ * A closure over the temporary state needed to compute the expression. This makes the evaluation
+ * thread-safe, as long as instances of this class are used only within a single thread.
+ */
private static final class Closure<T> {
private final QueryExpression expression;
/** A dynamically-populated mapping from test_suite rules to their tests. */
@@ -165,8 +177,7 @@ class TestsFunction implements QueryFunction {
*
* @precondition env.getAccessor().isTestSuite(testSuite)
*/
- private synchronized Set<T> getTestsInSuite(T testSuite)
- throws QueryException, InterruptedException {
+ private Set<T> getTestsInSuite(T testSuite) throws QueryException, InterruptedException {
Set<T> tests = testsInSuite.get(testSuite);
if (tests == null) {
tests = Sets.newHashSet();
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java
new file mode 100644
index 0000000000..950335e38a
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeCallback.java
@@ -0,0 +1,23 @@
+// Copyright 2014 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.query2.engine;
+
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+import com.google.devtools.build.lib.util.ThreadSafeBatchCallback;
+
+/** Marker interface for a {@link Callback} that is {@link ThreadSafe}. */
+@ThreadSafe
+public interface ThreadSafeCallback<T>
+ extends Callback<T>, ThreadSafeBatchCallback<T, QueryException> {
+}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeOutputFormatterCallback.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeUniquifier.java
index bc3eb59f84..747185582f 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeOutputFormatterCallback.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/ThreadSafeUniquifier.java
@@ -1,4 +1,4 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
+// Copyright 2016 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -15,7 +15,8 @@ package com.google.devtools.build.lib.query2.engine;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-/** A marker parent class for a {@link ThreadSafe} {@link OutputFormatterCallback}. */
+/** Marker interface for a {@link ThreadSafe} {@link Uniquifier}. */
@ThreadSafe
-public abstract class ThreadSafeOutputFormatterCallback<T> extends OutputFormatterCallback<T> {
-} \ No newline at end of file
+public interface ThreadSafeUniquifier<T> extends Uniquifier<T> {
+}
+
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java b/src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java
index 5f8faf56b1..ed2b2376fa 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/Uniquifier.java
@@ -14,10 +14,8 @@
package com.google.devtools.build.lib.query2.engine;
import com.google.common.collect.ImmutableList;
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
/** A helper for deduping values. */
-@ThreadSafe
public interface Uniquifier<T> {
/** Returns whether {@code newElement} has been seen before. */
boolean unique(T newElement);
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java
index b09910c715..532f331378 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java
@@ -14,14 +14,13 @@
package com.google.devtools.build.lib.query2.engine;
-import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction;
-import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
/**
* A visible(x, y) query expression, which computes the subset of nodes in y
@@ -53,32 +52,34 @@ public class VisibleFunction implements QueryFunction {
}
@Override
- public <T> QueryTaskFuture<Void> eval(
+ public <T> void eval(
final QueryEnvironment<T> env,
- final VariableContext<T> context,
+ VariableContext<T> context,
QueryExpression expression,
- final List<Argument> args,
- final Callback<T> callback) {
- final QueryTaskFuture<Set<T>> toSetFuture =
- QueryUtil.evalAll(env, context, args.get(0).getExpression());
- Function<Set<T>, QueryTaskFuture<Void>> computeVisibleNodesAsyncFunction =
- new Function<Set<T>, QueryTaskFuture<Void>>() {
- @Override
- public QueryTaskFuture<Void> apply(final Set<T> toSet) {
- return env.eval(args.get(1).getExpression(), context, new Callback<T>() {
- @Override
- public void process(Iterable<T> partialResult)
- throws QueryException, InterruptedException {
- for (T t : partialResult) {
- if (visibleToAll(env, toSet, t)) {
- callback.process(ImmutableList.of(t));
- }
- }
- }
- });
+ List<Argument> args,
+ final Callback<T> callback) throws QueryException, InterruptedException {
+ final Set<T> toSet = QueryUtil.evalAll(env, context, args.get(0).getExpression());
+ env.eval(args.get(1).getExpression(), context, new Callback<T>() {
+ @Override
+ public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
+ for (T t : partialResult) {
+ if (visibleToAll(env, toSet, t)) {
+ callback.process(ImmutableList.of(t));
}
- };
- return env.transformAsync(toSetFuture, computeVisibleNodesAsyncFunction);
+ }
+ }
+ });
+ }
+
+ @Override
+ public <T> void parEval(
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ QueryExpression expression,
+ List<Argument> args,
+ ThreadSafeCallback<T> callback,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+ eval(env, context, expression, args, callback);
}
/** Returns true if {@code target} is visible to all targets in {@code toSet}. */