diff options
author | Mark Schaller <mschaller@google.com> | 2016-06-21 22:51:14 +0000 |
---|---|---|
committer | Philipp Wollermann <philwo@google.com> | 2016-06-22 10:48:17 +0000 |
commit | 20c75010f15f130d6107f98cdbbfad004f6bd549 (patch) | |
tree | 7db7791eb33e05ea30d05f9259e375a69ddfa077 | |
parent | 4b801f2abf445e7c2e82784cf3f44e9239b52d5c (diff) |
Parallelize (some of) BinaryOperatorExpression
Adds evalConcurrently to QueryExpression so that expression
implementations that support concurrent evaluation can do so using the
supplied executor service.
Implements concurrent evaluation for the PLUS/UNION cases of
BinaryOperatorExpression.
Because evalConcurrently requires its callback to be threadsafe, but
the callback passed to evaluateQuery may only be called by one thread
at a time, this change makes the BatchStreamedCallback constructed by
SkyQueryEnvironment threadsafe, including its uniquifier.
However, there is a thread-safety problem when the operands of
BinaryOperatorExpression are LetExpressions, because their evaluation
involves mutating state in the query environment. A future change will
fix that. For now, concurrent evaluation is only attempted when the
query expression is a BinaryOperatorExpression and all its operands are
target literals.
--
MOS_MIGRATED_REVID=125505247
3 files changed, 163 insertions, 45 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index 2accdcea63..30a7aa65a1 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -36,6 +36,7 @@ import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.cmdline.TargetParsingException; import com.google.devtools.build.lib.cmdline.TargetPattern; import com.google.devtools.build.lib.collect.CompactHashSet; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.EventHandler; import com.google.devtools.build.lib.graph.Digraph; @@ -50,13 +51,13 @@ import com.google.devtools.build.lib.pkgcache.PathPackageLocator; import com.google.devtools.build.lib.pkgcache.TargetPatternEvaluator; import com.google.devtools.build.lib.profiler.AutoProfiler; import com.google.devtools.build.lib.query2.engine.AllRdepsFunction; +import com.google.devtools.build.lib.query2.engine.BinaryOperatorExpression; import com.google.devtools.build.lib.query2.engine.Callback; import com.google.devtools.build.lib.query2.engine.FunctionExpression; import com.google.devtools.build.lib.query2.engine.QueryEvalResult; import com.google.devtools.build.lib.query2.engine.QueryException; import com.google.devtools.build.lib.query2.engine.QueryExpression; import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper; -import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractUniquifier; import com.google.devtools.build.lib.query2.engine.RdepsFunction; import com.google.devtools.build.lib.query2.engine.TargetLiteral; import com.google.devtools.build.lib.query2.engine.Uniquifier; @@ -85,6 +86,7 @@ import com.google.devtools.build.skyframe.WalkableGraph.WalkableGraphFactory; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Deque; import java.util.HashMap; import java.util.HashSet; @@ -93,6 +95,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; @@ -106,9 +109,11 @@ import javax.annotation.Nullable; * even if the full closure isn't needed. */ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> { + // 10k is likely a good balance between using batch efficiently and not blowing up memory. // TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant. private static final int BATCH_CALLBACK_SIZE = 10000; + private static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors(); protected WalkableGraph graph; private Supplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier; @@ -134,7 +139,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> { private final ListeningExecutorService threadPool = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( - Runtime.getRuntime().availableProcessors(), + DEFAULT_THREAD_COUNT, new ThreadFactoryBuilder().setNameFormat("GetPackages-%d").build())); private RecursivePackageProviderBackedTargetPatternResolver resolver; @@ -158,9 +163,11 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> { int loadingPhaseThreads, EventHandler eventHandler, Set<Setting> settings, - Iterable<QueryFunction> extraFunctions, String parserPrefix, + Iterable<QueryFunction> extraFunctions, + String parserPrefix, WalkableGraphFactory graphFactory, - List<String> universeScope, PathPackageLocator pkgPath) { + List<String> universeScope, + PathPackageLocator pkgPath) { super( keepGoing, /*strictScope=*/ true, @@ -268,21 +275,25 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> { // This flushes the batched callback prior to constructing the QueryEvalResult in the unlikely // case of a race between the original callback and the eventHandler. final BatchStreamedCallback aggregator = - new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE, createUniquifier()); + new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE); final AtomicBoolean empty = new AtomicBoolean(true); + Callback<Target> callbackWithEmptyCheck = + new Callback<Target>() { + @Override + public void process(Iterable<Target> partialResult) + throws QueryException, InterruptedException { + empty.compareAndSet(true, Iterables.isEmpty(partialResult)); + aggregator.process(partialResult); + } + }; try (final AutoProfiler p = AutoProfiler.logged("evaluating query", LOG)) { try { - expr.eval( - this, - new Callback<Target>() { - @Override - public void process(Iterable<Target> partialResult) - throws QueryException, InterruptedException { - empty.compareAndSet(true, Iterables.isEmpty(partialResult)); - aggregator.process(partialResult); - } - }); + if (canEvalConcurrently(expr)) { + expr.evalConcurrently(this, callbackWithEmptyCheck, threadPool); + } else { + expr.eval(this, callbackWithEmptyCheck); + } } catch (QueryException e) { throw new QueryException(e, expr); } @@ -293,17 +304,37 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> { if (!keepGoing) { // This case represents loading-phase errors reported during evaluation // of target patterns that don't cause evaluation to fail per se. - throw new QueryException("Evaluation of query \"" + expr - + "\" failed due to BUILD file errors"); + throw new QueryException( + "Evaluation of query \"" + expr + "\" failed due to BUILD file errors"); } else { - eventHandler.handle(Event.warn("--keep_going specified, ignoring errors. " - + "Results may be inaccurate")); + eventHandler.handle( + Event.warn("--keep_going specified, ignoring errors. " + "Results may be inaccurate")); } } return new QueryEvalResult(!eventHandler.hasErrors(), empty.get()); } + // TODO(mschaller): This method and its use above are a quick temporary fix to a threadsafety + // problem that can happen when the operands of a BinaryOperatorExpression contain LetExpressions. + // Namely, concurrent reads and writes to AbstractBlazeQueryEnvironment#letBindings may fail or + // produce the wrong results. + // For now, this limits concurrent query expression evaluation to BinaryOperatorExpressions with + // TargetLiteral operands. + private static boolean canEvalConcurrently(QueryExpression expr) { + if (!(expr instanceof BinaryOperatorExpression)) { + return false; + } + BinaryOperatorExpression binaryExpr = (BinaryOperatorExpression) expr; + ImmutableList<QueryExpression> operands = binaryExpr.getOperands(); + for (QueryExpression operand : operands) { + if (!(operand instanceof TargetLiteral)) { + return false; + } + } + return true; + } + private Map<Target, Collection<Target>> makeTargetsMap(Map<SkyKey, Iterable<SkyKey>> input) { ImmutableMap.Builder<Target, Collection<Target>> result = ImmutableMap.builder(); @@ -447,18 +478,9 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> { expr.eval(this, callback); } - private static Uniquifier<Target> uniquifier() { - return new AbstractUniquifier<Target, Label>() { - @Override - protected Label extractKey(Target target) { - return target.getLabel(); - } - }; - } - @Override public Uniquifier<Target> createUniquifier() { - return uniquifier(); + return new ConcurrentUniquifier(); } @Override @@ -841,6 +863,26 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> { .build(); } + @ThreadSafe + private static class ConcurrentUniquifier implements Uniquifier<Target> { + + // Note that setting initialCapacity to BATCH_CALLBACK_SIZE is not especially principled. + private final Set<Label> seen = + Collections.newSetFromMap( + new ConcurrentHashMap<Label, Boolean>(BATCH_CALLBACK_SIZE, .75f, DEFAULT_THREAD_COUNT)); + + @Override + public ImmutableList<Target> unique(Iterable<Target> newElements) { + ImmutableList.Builder<Target> builder = ImmutableList.builder(); + for (Target newElement : newElements) { + if (seen.add(newElement.getLabel())) { + builder.add(newElement); + } + } + return builder.build(); + } + } + /** * Wraps a {@link Callback} and guarantees that all calls to the original will have at least * {@code batchThreshold} {@link Target}s, except for the final such call. @@ -850,36 +892,44 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> { * <p>After this object's {@link #process} has been called for the last time, {#link * #processLastPending} must be called to "flush" any remaining {@link Target}s through to the * original. + * + * <p>This callback may be called from multiple threads concurrently. At most one thread will + * call the wrapped {@code callback} concurrently. */ + @ThreadSafe private static class BatchStreamedCallback implements Callback<Target> { private final Callback<Target> callback; - private final Uniquifier<Target> uniquifier; + private final Uniquifier<Target> uniquifier = new ConcurrentUniquifier(); + private final Object pendingLock = new Object(); private List<Target> pending = new ArrayList<>(); private int batchThreshold; - private BatchStreamedCallback(Callback<Target> callback, int batchThreshold, - Uniquifier<Target> uniquifier) { + private BatchStreamedCallback(Callback<Target> callback, int batchThreshold) { this.callback = callback; this.batchThreshold = batchThreshold; - this.uniquifier = uniquifier; } @Override public void process(Iterable<Target> partialResult) throws QueryException, InterruptedException { - Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed"); - pending.addAll(uniquifier.unique(partialResult)); - if (pending.size() >= batchThreshold) { - callback.process(pending); - pending = new ArrayList<>(); + ImmutableList<Target> uniquifiedTargets = uniquifier.unique(partialResult); + synchronized (pendingLock) { + Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed"); + pending.addAll(uniquifiedTargets); + if (pending.size() >= batchThreshold) { + callback.process(pending); + pending = new ArrayList<>(); + } } } private void processLastPending() throws QueryException, InterruptedException { - if (!pending.isEmpty()) { - callback.process(pending); - pending = null; + synchronized (pendingLock) { + if (!pending.isEmpty()) { + callback.process(pending); + pending = null; + } } } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java index 66555c7bee..c1d74c9873 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java @@ -14,12 +14,19 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.query2.engine.Lexer.TokenKind; import com.google.devtools.build.lib.util.Preconditions; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; /** * A binary algebraic set operation. @@ -33,7 +40,7 @@ import java.util.Set; * | expr ('-' expr)+ * </pre> */ -class BinaryOperatorExpression extends QueryExpression { +public class BinaryOperatorExpression extends QueryExpression { private final Lexer.TokenKind operator; // ::= INTERSECT/CARET | UNION/PLUS | EXCEPT/MINUS private final ImmutableList<QueryExpression> operands; @@ -49,17 +56,62 @@ class BinaryOperatorExpression extends QueryExpression { return operator; } - ImmutableList<QueryExpression> getOperands() { + public ImmutableList<QueryExpression> getOperands() { return operands; } @Override public <T> void eval(QueryEnvironment<T> env, Callback<T> callback) throws QueryException, InterruptedException { + evalConcurrently(env, callback, MoreExecutors.newDirectExecutorService()); + } + @Override + public <T> void evalConcurrently( + final QueryEnvironment<T> env, + final Callback<T> callback, + ListeningExecutorService executorService) + throws QueryException, InterruptedException { if (operator == TokenKind.PLUS || operator == TokenKind.UNION) { - for (QueryExpression operand : operands) { - env.eval(operand, callback); + final AtomicReference<InterruptedException> interruptRef = new AtomicReference<>(); + final AtomicReference<QueryException> queryExceptionRef = new AtomicReference<>(); + ArrayList<ListenableFuture<?>> futures = new ArrayList<>(operands.size()); + for (final QueryExpression operand : operands) { + // When executorService has an implementation that evaluates runnables in a non-serial + // order, like a fixedSizeThreadPool, the following code does not guarantee that operands' + // targets are emitted via the callback in the operands' order. And that's OK! + // BinaryOperatorExpression is a set operation. The query documentation states + // that set operations don't introduce any ordering constraints of their own. + // + // Ordering constraints for other kinds of expressions are enforced by the query + // environment. + futures.add( + executorService.submit( + new Runnable() { + @Override + public void run() { + try { + env.eval(operand, callback); + } catch (QueryException e) { + queryExceptionRef.compareAndSet(null, e); + } catch (InterruptedException e) { + interruptRef.compareAndSet(null, e); + } + } + })); + } + try { + Futures.allAsList(futures).get(); + } catch (ExecutionException e) { + throw new IllegalStateException(e); + } + InterruptedException interruptedExceptionIfAny = interruptRef.get(); + if (interruptedExceptionIfAny != null) { + throw interruptedExceptionIfAny; + } + QueryException queryException = queryExceptionRef.get(); + if (queryException != null) { + throw queryException; } return; } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java index b29071329b..e93b397299 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java @@ -13,6 +13,8 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; +import com.google.common.util.concurrent.ListeningExecutorService; + import java.util.Collection; /** @@ -71,6 +73,20 @@ public abstract class QueryExpression { throws QueryException, InterruptedException; /** + * Evaluates this query in the specified environment, as in {@link + * #eval(QueryEnvironment, Callback)}. If the query expression supports concurrent evaluation, it + * may employ {@code executorService}. + * + * <p>The caller must ensure that both {@code env} and {@code callback} are effectively + * threadsafe. The query expression may call their methods from multiple threads. + */ + public <T> void evalConcurrently( + QueryEnvironment<T> env, Callback<T> callback, ListeningExecutorService executorService) + throws QueryException, InterruptedException { + this.eval(env, callback); + } + + /** * Collects all target patterns that are referenced anywhere within this query expression and adds * them to the given collection, which must be mutable. */ |