diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java | 202 |
1 files changed, 104 insertions, 98 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java index 89374d0a94..f9d20dbb19 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java @@ -13,16 +13,16 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; +import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; -import com.google.devtools.build.lib.query2.engine.Lexer.TokenKind; -import com.google.devtools.build.lib.query2.engine.ParallelQueryUtils.QueryTask; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import com.google.devtools.build.lib.util.Preconditions; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.ForkJoinPool; /** * A binary algebraic set operation. @@ -56,40 +56,84 @@ public class BinaryOperatorExpression extends QueryExpression { } @Override - protected <T> void evalImpl( - QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) - throws QueryException, InterruptedException { + public <T> QueryTaskFuture<Void> eval( + QueryEnvironment<T> env, VariableContext<T> context, Callback<T> callback) { + switch (operator) { + case PLUS: + case UNION: + return evalPlus(operands, env, context, callback); + case MINUS: + case EXCEPT: + return evalMinus(operands, env, context, callback); + case INTERSECT: + case CARET: + return evalIntersect(env, context, callback); + default: + throw new IllegalStateException(operator.toString()); + } + } - if (operator == TokenKind.PLUS || operator == TokenKind.UNION) { - for (QueryExpression operand : operands) { - env.eval(operand, context, callback); - } - return; + /** + * Evaluates an expression of the form "e1 + e2 + ... + eK" by evaluating all the subexpressions + * separately. + * + * <p>N.B. {@code operands.size()} may be {@code 1}. + */ + private static <T> QueryTaskFuture<Void> evalPlus( + ImmutableList<QueryExpression> operands, + QueryEnvironment<T> env, + VariableContext<T> context, + Callback<T> callback) { + ArrayList<QueryTaskFuture<Void>> queryTasks = new ArrayList<>(operands.size()); + for (QueryExpression operand : operands) { + queryTasks.add(env.eval(operand, context, callback)); } + return env.whenAllSucceed(queryTasks); + } - // Once we have fully evaluated the left-hand side, we can stream-process the right-hand side - // for minus operations. Note that this is suboptimal if the left-hand side results are very - // large compared to the right-hand side. Which is the case is hard to know before evaluating. - // We could consider determining this dynamically, however, by evaluating both the left and - // right hand side partially until one side finishes sooner. - final Set<T> lhsValue = QueryUtil.evalAll(env, context, operands.get(0)); - if (operator == TokenKind.EXCEPT || operator == TokenKind.MINUS) { - for (int i = 1; i < operands.size(); i++) { - env.eval(operands.get(i), context, - new Callback<T>() { + /** + * Evaluates an expression of the form "e1 - e2 - ... - eK" by noting its equivalence to + * "e1 - (e2 + ... + eK)" and evaluating the subexpressions on the right-hand-side separately. + */ + private static <T> QueryTaskFuture<Void> evalMinus( + final ImmutableList<QueryExpression> operands, + final QueryEnvironment<T> env, + final VariableContext<T> context, + final Callback<T> callback) { + QueryTaskFuture<Set<T>> lhsValueFuture = QueryUtil.evalAll(env, context, operands.get(0)); + Function<Set<T>, QueryTaskFuture<Void>> substractAsyncFunction = + new Function<Set<T>, QueryTaskFuture<Void>>() { + @Override + public QueryTaskFuture<Void> apply(Set<T> lhsValue) { + final Set<T> threadSafeLhsValue = Sets.newConcurrentHashSet(lhsValue); + Callback<T> subtractionCallback = new Callback<T>() { + @Override + public void process(Iterable<T> partialResult) { + for (T target : partialResult) { + threadSafeLhsValue.remove(target); + } + } + }; + QueryTaskFuture<Void> rhsEvaluatedFuture = evalPlus( + operands.subList(1, operands.size()), env, context, subtractionCallback); + return env.whenSucceedsCall( + rhsEvaluatedFuture, + new QueryTaskCallable<Void>() { @Override - public void process(Iterable<T> partialResult) - throws QueryException, InterruptedException { - for (T target : partialResult) { - lhsValue.remove(target); - } + public Void call() throws QueryException, InterruptedException { + callback.process(threadSafeLhsValue); + return null; } }); } - callback.process(lhsValue); - return; - } + }; + return env.transformAsync(lhsValueFuture, substractAsyncFunction); + } + private <T> QueryTaskFuture<Void> evalIntersect( + final QueryEnvironment<T> env, + final VariableContext<T> context, + final Callback<T> callback) { // For each right-hand side operand, intersection cannot be performed in a streaming manner; the // entire result of that operand is needed. So, in order to avoid pinning too much in memory at // once, we process each right-hand side operand one at a time and throw away that operand's @@ -97,77 +141,39 @@ public class BinaryOperatorExpression extends QueryExpression { // TODO(bazel-team): Consider keeping just the name / label of the right-hand side results // instead of the potentially heavy-weight instances of type T. This would let us process all // right-hand side operands in parallel without worrying about memory usage. - Preconditions.checkState(operator == TokenKind.INTERSECT || operator == TokenKind.CARET, - operator); + QueryTaskFuture<Set<T>> rollingResultFuture = QueryUtil.evalAll(env, context, operands.get(0)); for (int i = 1; i < operands.size(); i++) { - lhsValue.retainAll(QueryUtil.evalAll(env, context, operands.get(i))); + final int index = i; + Function<Set<T>, QueryTaskFuture<Set<T>>> evalOperandAndIntersectAsyncFunction = + new Function<Set<T>, QueryTaskFuture<Set<T>>>() { + @Override + public QueryTaskFuture<Set<T>> apply(final Set<T> rollingResult) { + final QueryTaskFuture<Set<T>> rhsOperandValueFuture = + QueryUtil.evalAll(env, context, operands.get(index)); + return env.whenSucceedsCall( + rhsOperandValueFuture, + new QueryTaskCallable<Set<T>>() { + @Override + public Set<T> call() throws QueryException, InterruptedException { + rollingResult.retainAll(rhsOperandValueFuture.getIfSuccessful()); + return rollingResult; + } + }); + } + }; + rollingResultFuture = + env.transformAsync(rollingResultFuture, evalOperandAndIntersectAsyncFunction); } - callback.process(lhsValue); - } - - @Override - protected <T> void parEvalImpl( - QueryEnvironment<T> env, - VariableContext<T> context, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) - throws QueryException, InterruptedException { - if (operator == TokenKind.PLUS || operator == TokenKind.UNION) { - parEvalPlus(operands, env, context, callback, forkJoinPool); - } else if (operator == TokenKind.EXCEPT || operator == TokenKind.MINUS) { - parEvalMinus(operands, env, context, callback, forkJoinPool); - } else { - evalImpl(env, context, callback); - } - } - - /** - * Evaluates an expression of the form "e1 + e2 + ... + eK" by evaluating all the subexpressions - * in parallel. - */ - private static <T> void parEvalPlus( - ImmutableList<QueryExpression> operands, - final QueryEnvironment<T> env, - final VariableContext<T> context, - final ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) - throws QueryException, InterruptedException { - ArrayList<QueryTask> queryTasks = new ArrayList<>(operands.size()); - for (final QueryExpression operand : operands) { - queryTasks.add(new QueryTask() { - @Override - public void execute() throws QueryException, InterruptedException { - env.eval(operand, context, callback); - } - }); - } - ParallelQueryUtils.executeQueryTasksAndWaitInterruptiblyFailFast(queryTasks, forkJoinPool); - } - - /** - * Evaluates an expression of the form "e1 - e2 - ... - eK" by noting its equivalence to - * "e1 - (e2 + ... + eK)" and evaluating the subexpressions on the right-hand-side in parallel. - */ - private static <T> void parEvalMinus( - ImmutableList<QueryExpression> operands, - QueryEnvironment<T> env, - VariableContext<T> context, - ThreadSafeCallback<T> callback, - ForkJoinPool forkJoinPool) - throws QueryException, InterruptedException { - final Set<T> lhsValue = - Sets.newConcurrentHashSet(QueryUtil.evalAll(env, context, operands.get(0))); - ThreadSafeCallback<T> subtractionCallback = new ThreadSafeCallback<T>() { - @Override - public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { - for (T target : partialResult) { - lhsValue.remove(target); - } - } - }; - parEvalPlus( - operands.subList(1, operands.size()), env, context, subtractionCallback, forkJoinPool); - callback.process(lhsValue); + final QueryTaskFuture<Set<T>> resultFuture = rollingResultFuture; + return env.whenSucceedsCall( + resultFuture, + new QueryTaskCallable<Void>() { + @Override + public Void call() throws QueryException, InterruptedException { + callback.process(resultFuture.getIfSuccessful()); + return null; + } + }); } @Override |