aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java202
1 files changed, 104 insertions, 98 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
index 89374d0a94..f9d20dbb19 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
@@ -13,16 +13,16 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
+import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
-import com.google.devtools.build.lib.query2.engine.Lexer.TokenKind;
-import com.google.devtools.build.lib.query2.engine.ParallelQueryUtils.QueryTask;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable;
+import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
import com.google.devtools.build.lib.util.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.ForkJoinPool;
/**
* A binary algebraic set operation.
@@ -56,40 +56,84 @@ public class BinaryOperatorExpression extends QueryExpression {
}
@Override
- protected <T> void evalImpl(
- QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback)
- throws QueryException, InterruptedException {
+ public <T> QueryTaskFuture<Void> eval(
+ QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) {
+ switch (operator) {
+ case PLUS:
+ case UNION:
+ return evalPlus(operands, env, context, callback);
+ case MINUS:
+ case EXCEPT:
+ return evalMinus(operands, env, context, callback);
+ case INTERSECT:
+ case CARET:
+ return evalIntersect(env, context, callback);
+ default:
+ throw new IllegalStateException(operator.toString());
+ }
+ }
- if (operator == TokenKind.PLUS || operator == TokenKind.UNION) {
- for (QueryExpression operand : operands) {
- env.eval(operand, context, callback);
- }
- return;
+ /**
+ * Evaluates an expression of the form "e1 + e2 + ... + eK" by evaluating all the subexpressions
+ * separately.
+ *
+ * <p>N.B. {@code operands.size()} may be {@code 1}.
+ */
+ private static <T> QueryTaskFuture<Void> evalPlus(
+ ImmutableList<QueryExpression> operands,
+ QueryEnvironment<T> env,
+ VariableContext<T> context,
+ Callback<T> callback) {
+ ArrayList<QueryTaskFuture<Void>> queryTasks = new ArrayList<>(operands.size());
+ for (QueryExpression operand : operands) {
+ queryTasks.add(env.eval(operand, context, callback));
}
+ return env.whenAllSucceed(queryTasks);
+ }
- // Once we have fully evaluated the left-hand side, we can stream-process the right-hand side
- // for minus operations. Note that this is suboptimal if the left-hand side results are very
- // large compared to the right-hand side. Which is the case is hard to know before evaluating.
- // We could consider determining this dynamically, however, by evaluating both the left and
- // right hand side partially until one side finishes sooner.
- final Set<T> lhsValue = QueryUtil.evalAll(env, context, operands.get(0));
- if (operator == TokenKind.EXCEPT || operator == TokenKind.MINUS) {
- for (int i = 1; i < operands.size(); i++) {
- env.eval(operands.get(i), context,
- new Callback<T>() {
+ /**
+ * Evaluates an expression of the form "e1 - e2 - ... - eK" by noting its equivalence to
+ * "e1 - (e2 + ... + eK)" and evaluating the subexpressions on the right-hand-side separately.
+ */
+ private static <T> QueryTaskFuture<Void> evalMinus(
+ final ImmutableList<QueryExpression> operands,
+ final QueryEnvironment<T> env,
+ final VariableContext<T> context,
+ final Callback<T> callback) {
+ QueryTaskFuture<Set<T>> lhsValueFuture = QueryUtil.evalAll(env, context, operands.get(0));
+ Function<Set<T>, QueryTaskFuture<Void>> substractAsyncFunction =
+ new Function<Set<T>, QueryTaskFuture<Void>>() {
+ @Override
+ public QueryTaskFuture<Void> apply(Set<T> lhsValue) {
+ final Set<T> threadSafeLhsValue = Sets.newConcurrentHashSet(lhsValue);
+ Callback<T> subtractionCallback = new Callback<T>() {
+ @Override
+ public void process(Iterable<T> partialResult) {
+ for (T target : partialResult) {
+ threadSafeLhsValue.remove(target);
+ }
+ }
+ };
+ QueryTaskFuture<Void> rhsEvaluatedFuture = evalPlus(
+ operands.subList(1, operands.size()), env, context, subtractionCallback);
+ return env.whenSucceedsCall(
+ rhsEvaluatedFuture,
+ new QueryTaskCallable<Void>() {
@Override
- public void process(Iterable<T> partialResult)
- throws QueryException, InterruptedException {
- for (T target : partialResult) {
- lhsValue.remove(target);
- }
+ public Void call() throws QueryException, InterruptedException {
+ callback.process(threadSafeLhsValue);
+ return null;
}
});
}
- callback.process(lhsValue);
- return;
- }
+ };
+ return env.transformAsync(lhsValueFuture, substractAsyncFunction);
+ }
+ private <T> QueryTaskFuture<Void> evalIntersect(
+ final QueryEnvironment<T> env,
+ final VariableContext<T> context,
+ final Callback<T> callback) {
// For each right-hand side operand, intersection cannot be performed in a streaming manner; the
// entire result of that operand is needed. So, in order to avoid pinning too much in memory at
// once, we process each right-hand side operand one at a time and throw away that operand's
@@ -97,77 +141,39 @@ public class BinaryOperatorExpression extends QueryExpression {
// TODO(bazel-team): Consider keeping just the name / label of the right-hand side results
// instead of the potentially heavy-weight instances of type T. This would let us process all
// right-hand side operands in parallel without worrying about memory usage.
- Preconditions.checkState(operator == TokenKind.INTERSECT || operator == TokenKind.CARET,
- operator);
+ QueryTaskFuture<Set<T>> rollingResultFuture = QueryUtil.evalAll(env, context, operands.get(0));
for (int i = 1; i < operands.size(); i++) {
- lhsValue.retainAll(QueryUtil.evalAll(env, context, operands.get(i)));
+ final int index = i;
+ Function<Set<T>, QueryTaskFuture<Set<T>>> evalOperandAndIntersectAsyncFunction =
+ new Function<Set<T>, QueryTaskFuture<Set<T>>>() {
+ @Override
+ public QueryTaskFuture<Set<T>> apply(final Set<T> rollingResult) {
+ final QueryTaskFuture<Set<T>> rhsOperandValueFuture =
+ QueryUtil.evalAll(env, context, operands.get(index));
+ return env.whenSucceedsCall(
+ rhsOperandValueFuture,
+ new QueryTaskCallable<Set<T>>() {
+ @Override
+ public Set<T> call() throws QueryException, InterruptedException {
+ rollingResult.retainAll(rhsOperandValueFuture.getIfSuccessful());
+ return rollingResult;
+ }
+ });
+ }
+ };
+ rollingResultFuture =
+ env.transformAsync(rollingResultFuture, evalOperandAndIntersectAsyncFunction);
}
- callback.process(lhsValue);
- }
-
- @Override
- protected <T> void parEvalImpl(
- QueryEnvironment<T> env,
- VariableContext<T> context,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool)
- throws QueryException, InterruptedException {
- if (operator == TokenKind.PLUS || operator == TokenKind.UNION) {
- parEvalPlus(operands, env, context, callback, forkJoinPool);
- } else if (operator == TokenKind.EXCEPT || operator == TokenKind.MINUS) {
- parEvalMinus(operands, env, context, callback, forkJoinPool);
- } else {
- evalImpl(env, context, callback);
- }
- }
-
- /**
- * Evaluates an expression of the form "e1 + e2 + ... + eK" by evaluating all the subexpressions
- * in parallel.
- */
- private static <T> void parEvalPlus(
- ImmutableList<QueryExpression> operands,
- final QueryEnvironment<T> env,
- final VariableContext<T> context,
- final ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool)
- throws QueryException, InterruptedException {
- ArrayList<QueryTask> queryTasks = new ArrayList<>(operands.size());
- for (final QueryExpression operand : operands) {
- queryTasks.add(new QueryTask() {
- @Override
- public void execute() throws QueryException, InterruptedException {
- env.eval(operand, context, callback);
- }
- });
- }
- ParallelQueryUtils.executeQueryTasksAndWaitInterruptiblyFailFast(queryTasks, forkJoinPool);
- }
-
- /**
- * Evaluates an expression of the form "e1 - e2 - ... - eK" by noting its equivalence to
- * "e1 - (e2 + ... + eK)" and evaluating the subexpressions on the right-hand-side in parallel.
- */
- private static <T> void parEvalMinus(
- ImmutableList<QueryExpression> operands,
- QueryEnvironment<T> env,
- VariableContext<T> context,
- ThreadSafeCallback<T> callback,
- ForkJoinPool forkJoinPool)
- throws QueryException, InterruptedException {
- final Set<T> lhsValue =
- Sets.newConcurrentHashSet(QueryUtil.evalAll(env, context, operands.get(0)));
- ThreadSafeCallback<T> subtractionCallback = new ThreadSafeCallback<T>() {
- @Override
- public void process(Iterable<T> partialResult) throws QueryException, InterruptedException {
- for (T target : partialResult) {
- lhsValue.remove(target);
- }
- }
- };
- parEvalPlus(
- operands.subList(1, operands.size()), env, context, subtractionCallback, forkJoinPool);
- callback.process(lhsValue);
+ final QueryTaskFuture<Set<T>> resultFuture = rollingResultFuture;
+ return env.whenSucceedsCall(
+ resultFuture,
+ new QueryTaskCallable<Void>() {
+ @Override
+ public Void call() throws QueryException, InterruptedException {
+ callback.process(resultFuture.getIfSuccessful());
+ return null;
+ }
+ });
}
@Override