From 7a5a236dfd099eb78e019482e9fc428b5b1182fd Mon Sep 17 00:00:00 2001 From: Nathan Harmata Date: Wed, 8 Mar 2017 22:42:01 +0000 Subject: Description redacted. -- PiperOrigin-RevId: 149585165 MOS_MIGRATED_REVID=149585165 --- .../query2/engine/BinaryOperatorExpression.java | 202 +++++++++++---------- 1 file changed, 104 insertions(+), 98 deletions(-) (limited to 'src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java') diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java index 89374d0a94..f9d20dbb19 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java @@ -13,16 +13,16 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; +import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; -import com.google.devtools.build.lib.query2.engine.Lexer.TokenKind; -import com.google.devtools.build.lib.query2.engine.ParallelQueryUtils.QueryTask; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskCallable; +import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import com.google.devtools.build.lib.util.Preconditions; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.ForkJoinPool; /** * A binary algebraic set operation. @@ -56,40 +56,84 @@ public class BinaryOperatorExpression extends QueryExpression { } @Override - protected void evalImpl( - QueryEnvironment env, VariableContext context, Callback callback) - throws QueryException, InterruptedException { + public QueryTaskFuture eval( + QueryEnvironment env, VariableContext context, Callback callback) { + switch (operator) { + case PLUS: + case UNION: + return evalPlus(operands, env, context, callback); + case MINUS: + case EXCEPT: + return evalMinus(operands, env, context, callback); + case INTERSECT: + case CARET: + return evalIntersect(env, context, callback); + default: + throw new IllegalStateException(operator.toString()); + } + } - if (operator == TokenKind.PLUS || operator == TokenKind.UNION) { - for (QueryExpression operand : operands) { - env.eval(operand, context, callback); - } - return; + /** + * Evaluates an expression of the form "e1 + e2 + ... + eK" by evaluating all the subexpressions + * separately. + * + *

N.B. {@code operands.size()} may be {@code 1}. + */ + private static QueryTaskFuture evalPlus( + ImmutableList operands, + QueryEnvironment env, + VariableContext context, + Callback callback) { + ArrayList> queryTasks = new ArrayList<>(operands.size()); + for (QueryExpression operand : operands) { + queryTasks.add(env.eval(operand, context, callback)); } + return env.whenAllSucceed(queryTasks); + } - // Once we have fully evaluated the left-hand side, we can stream-process the right-hand side - // for minus operations. Note that this is suboptimal if the left-hand side results are very - // large compared to the right-hand side. Which is the case is hard to know before evaluating. - // We could consider determining this dynamically, however, by evaluating both the left and - // right hand side partially until one side finishes sooner. - final Set lhsValue = QueryUtil.evalAll(env, context, operands.get(0)); - if (operator == TokenKind.EXCEPT || operator == TokenKind.MINUS) { - for (int i = 1; i < operands.size(); i++) { - env.eval(operands.get(i), context, - new Callback() { + /** + * Evaluates an expression of the form "e1 - e2 - ... - eK" by noting its equivalence to + * "e1 - (e2 + ... + eK)" and evaluating the subexpressions on the right-hand-side separately. + */ + private static QueryTaskFuture evalMinus( + final ImmutableList operands, + final QueryEnvironment env, + final VariableContext context, + final Callback callback) { + QueryTaskFuture> lhsValueFuture = QueryUtil.evalAll(env, context, operands.get(0)); + Function, QueryTaskFuture> substractAsyncFunction = + new Function, QueryTaskFuture>() { + @Override + public QueryTaskFuture apply(Set lhsValue) { + final Set threadSafeLhsValue = Sets.newConcurrentHashSet(lhsValue); + Callback subtractionCallback = new Callback() { + @Override + public void process(Iterable partialResult) { + for (T target : partialResult) { + threadSafeLhsValue.remove(target); + } + } + }; + QueryTaskFuture rhsEvaluatedFuture = evalPlus( + operands.subList(1, operands.size()), env, context, subtractionCallback); + return env.whenSucceedsCall( + rhsEvaluatedFuture, + new QueryTaskCallable() { @Override - public void process(Iterable partialResult) - throws QueryException, InterruptedException { - for (T target : partialResult) { - lhsValue.remove(target); - } + public Void call() throws QueryException, InterruptedException { + callback.process(threadSafeLhsValue); + return null; } }); } - callback.process(lhsValue); - return; - } + }; + return env.transformAsync(lhsValueFuture, substractAsyncFunction); + } + private QueryTaskFuture evalIntersect( + final QueryEnvironment env, + final VariableContext context, + final Callback callback) { // For each right-hand side operand, intersection cannot be performed in a streaming manner; the // entire result of that operand is needed. So, in order to avoid pinning too much in memory at // once, we process each right-hand side operand one at a time and throw away that operand's @@ -97,77 +141,39 @@ public class BinaryOperatorExpression extends QueryExpression { // TODO(bazel-team): Consider keeping just the name / label of the right-hand side results // instead of the potentially heavy-weight instances of type T. This would let us process all // right-hand side operands in parallel without worrying about memory usage. - Preconditions.checkState(operator == TokenKind.INTERSECT || operator == TokenKind.CARET, - operator); + QueryTaskFuture> rollingResultFuture = QueryUtil.evalAll(env, context, operands.get(0)); for (int i = 1; i < operands.size(); i++) { - lhsValue.retainAll(QueryUtil.evalAll(env, context, operands.get(i))); + final int index = i; + Function, QueryTaskFuture>> evalOperandAndIntersectAsyncFunction = + new Function, QueryTaskFuture>>() { + @Override + public QueryTaskFuture> apply(final Set rollingResult) { + final QueryTaskFuture> rhsOperandValueFuture = + QueryUtil.evalAll(env, context, operands.get(index)); + return env.whenSucceedsCall( + rhsOperandValueFuture, + new QueryTaskCallable>() { + @Override + public Set call() throws QueryException, InterruptedException { + rollingResult.retainAll(rhsOperandValueFuture.getIfSuccessful()); + return rollingResult; + } + }); + } + }; + rollingResultFuture = + env.transformAsync(rollingResultFuture, evalOperandAndIntersectAsyncFunction); } - callback.process(lhsValue); - } - - @Override - protected void parEvalImpl( - QueryEnvironment env, - VariableContext context, - ThreadSafeCallback callback, - ForkJoinPool forkJoinPool) - throws QueryException, InterruptedException { - if (operator == TokenKind.PLUS || operator == TokenKind.UNION) { - parEvalPlus(operands, env, context, callback, forkJoinPool); - } else if (operator == TokenKind.EXCEPT || operator == TokenKind.MINUS) { - parEvalMinus(operands, env, context, callback, forkJoinPool); - } else { - evalImpl(env, context, callback); - } - } - - /** - * Evaluates an expression of the form "e1 + e2 + ... + eK" by evaluating all the subexpressions - * in parallel. - */ - private static void parEvalPlus( - ImmutableList operands, - final QueryEnvironment env, - final VariableContext context, - final ThreadSafeCallback callback, - ForkJoinPool forkJoinPool) - throws QueryException, InterruptedException { - ArrayList queryTasks = new ArrayList<>(operands.size()); - for (final QueryExpression operand : operands) { - queryTasks.add(new QueryTask() { - @Override - public void execute() throws QueryException, InterruptedException { - env.eval(operand, context, callback); - } - }); - } - ParallelQueryUtils.executeQueryTasksAndWaitInterruptiblyFailFast(queryTasks, forkJoinPool); - } - - /** - * Evaluates an expression of the form "e1 - e2 - ... - eK" by noting its equivalence to - * "e1 - (e2 + ... + eK)" and evaluating the subexpressions on the right-hand-side in parallel. - */ - private static void parEvalMinus( - ImmutableList operands, - QueryEnvironment env, - VariableContext context, - ThreadSafeCallback callback, - ForkJoinPool forkJoinPool) - throws QueryException, InterruptedException { - final Set lhsValue = - Sets.newConcurrentHashSet(QueryUtil.evalAll(env, context, operands.get(0))); - ThreadSafeCallback subtractionCallback = new ThreadSafeCallback() { - @Override - public void process(Iterable partialResult) throws QueryException, InterruptedException { - for (T target : partialResult) { - lhsValue.remove(target); - } - } - }; - parEvalPlus( - operands.subList(1, operands.size()), env, context, subtractionCallback, forkJoinPool); - callback.process(lhsValue); + final QueryTaskFuture> resultFuture = rollingResultFuture; + return env.whenSucceedsCall( + resultFuture, + new QueryTaskCallable() { + @Override + public Void call() throws QueryException, InterruptedException { + callback.process(resultFuture.getIfSuccessful()); + return null; + } + }); } @Override -- cgit v1.2.3