diff options
author | 2016-11-21 17:04:48 +0000 | |
---|---|---|
committer | 2016-11-21 19:42:49 +0000 | |
commit | 324b592055378c305a31009ecfbba0cb78935003 (patch) | |
tree | 8345230cd87217c3eb8258a7260de13ba5f9bf86 /src/main/java | |
parent | c4e965e920e1fb6328407147c478513769d0a2c7 (diff) |
Provide a parallel implementation of "e1 - e2 - e3" by noting its equivalence to "e1 - (e2 + e3)" and the fact that we already have a parallel implementation of "e2 + e3".
--
MOS_MIGRATED_REVID=139792288
Diffstat (limited to 'src/main/java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java | 91 |
1 files changed, 67 insertions, 24 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java index fe724404cb..d1efe80ff8 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java @@ -15,6 +15,7 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; import com.google.devtools.build.lib.concurrent.MoreFutures; import com.google.devtools.build.lib.query2.engine.Lexer.TokenKind; import com.google.devtools.build.lib.util.Preconditions; @@ -107,38 +108,80 @@ public class BinaryOperatorExpression extends QueryExpression { @Override protected <T> void parEvalImpl( - final QueryEnvironment<T> env, - final VariableContext<T> context, - final ThreadSafeCallback<T> callback, + QueryEnvironment<T> env, + VariableContext<T> context, + ThreadSafeCallback<T> callback, ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { if (operator == TokenKind.PLUS || operator == TokenKind.UNION) { - ArrayList<ForkJoinTask<Void>> tasks = new ArrayList<>(operands.size()); - for (final QueryExpression operand : operands) { - tasks.add(ForkJoinTask.adapt( - new Callable<Void>() { - @Override - public Void call() throws QueryException, InterruptedException { - env.eval(operand, context, callback); - return null; - } - })); - } - for (ForkJoinTask<?> task : tasks) { - forkJoinPool.submit(task); - } - try { - MoreFutures.waitForAllInterruptiblyFailFast(tasks); - } catch (ExecutionException e) { - Throwables.propagateIfPossible( - e.getCause(), QueryException.class, InterruptedException.class); - throw new IllegalStateException(e); - } + parEvalPlus(operands, env, context, callback, forkJoinPool); + } else if (operator == TokenKind.EXCEPT || operator == TokenKind.MINUS) { + parEvalMinus(operands, env, context, callback, forkJoinPool); } else { evalImpl(env, context, callback); } } + /** + * Evaluates an expression of the form "e1 + e2 + ... + eK" by evaluating all the subexpressions + * in parallel. + */ + private static <T> void parEvalPlus( + ImmutableList<QueryExpression> operands, + final QueryEnvironment<T> env, + final VariableContext<T> context, + final ThreadSafeCallback<T> callback, + ForkJoinPool forkJoinPool) + throws QueryException, InterruptedException { + ArrayList<ForkJoinTask<Void>> tasks = new ArrayList<>(operands.size()); + for (final QueryExpression operand : operands) { + tasks.add(ForkJoinTask.adapt( + new Callable<Void>() { + @Override + public Void call() throws QueryException, InterruptedException { + env.eval(operand, context, callback); + return null; + } + })); + } + for (ForkJoinTask<?> task : tasks) { + forkJoinPool.submit(task); + } + try { + MoreFutures.waitForAllInterruptiblyFailFast(tasks); + } catch (ExecutionException e) { + Throwables.propagateIfPossible( + e.getCause(), QueryException.class, InterruptedException.class); + throw new IllegalStateException(e); + } + } + + /** + * Evaluates an expression of the form "e1 - e2 - ... - eK" by noting its equivalence to + * "e1 - (e2 + ... + eK)" and evaluating the subexpressions on the right-hand-side in parallel. + */ + private static <T> void parEvalMinus( + ImmutableList<QueryExpression> operands, + QueryEnvironment<T> env, + VariableContext<T> context, + ThreadSafeCallback<T> callback, + ForkJoinPool forkJoinPool) + throws QueryException, InterruptedException { + final Set<T> lhsValue = + Sets.newConcurrentHashSet(QueryUtil.evalAll(env, context, operands.get(0))); + ThreadSafeCallback<T> subtractionCallback = new ThreadSafeCallback<T>() { + @Override + public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { + for (T target : partialResult) { + lhsValue.remove(target); + } + } + }; + parEvalPlus( + operands.subList(1, operands.size()), env, context, subtractionCallback, forkJoinPool); + callback.process(lhsValue); + } + @Override public void collectTargetPatterns(Collection<String> literals) { for (QueryExpression subExpression : operands) { |