aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
diff options
context:
space:
mode:
authorGravatar Mark Schaller <mschaller@google.com>2016-06-21 22:51:14 +0000
committerGravatar Philipp Wollermann <philwo@google.com>2016-06-22 10:48:17 +0000
commit20c75010f15f130d6107f98cdbbfad004f6bd549 (patch)
tree7db7791eb33e05ea30d05f9259e375a69ddfa077 /src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
parent4b801f2abf445e7c2e82784cf3f44e9239b52d5c (diff)
Parallelize (some of) BinaryOperatorExpression
Adds evalConcurrently to QueryExpression so that expression implementations that support concurrent evaluation can do so using the supplied executor service. Implements concurrent evaluation for the PLUS/UNION cases of BinaryOperatorExpression. Because evalConcurrently requires its callback to be threadsafe, but the callback passed to evaluateQuery may only be called by one thread at a time, this change makes the BatchStreamedCallback constructed by SkyQueryEnvironment threadsafe, including its uniquifier. However, there is a thread-safety problem when the operands of BinaryOperatorExpression are LetExpressions, because their evaluation involves mutating state in the query environment. A future change will fix that. For now, concurrent evaluation is only attempted when the query expression is a BinaryOperatorExpression and all its operands are target literals. -- MOS_MIGRATED_REVID=125505247
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java132
1 files changed, 91 insertions, 41 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 2accdcea63..30a7aa65a1 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -36,6 +36,7 @@ import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.cmdline.TargetParsingException;
import com.google.devtools.build.lib.cmdline.TargetPattern;
import com.google.devtools.build.lib.collect.CompactHashSet;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
import com.google.devtools.build.lib.graph.Digraph;
@@ -50,13 +51,13 @@ import com.google.devtools.build.lib.pkgcache.PathPackageLocator;
import com.google.devtools.build.lib.pkgcache.TargetPatternEvaluator;
import com.google.devtools.build.lib.profiler.AutoProfiler;
import com.google.devtools.build.lib.query2.engine.AllRdepsFunction;
+import com.google.devtools.build.lib.query2.engine.BinaryOperatorExpression;
import com.google.devtools.build.lib.query2.engine.Callback;
import com.google.devtools.build.lib.query2.engine.FunctionExpression;
import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractUniquifier;
import com.google.devtools.build.lib.query2.engine.RdepsFunction;
import com.google.devtools.build.lib.query2.engine.TargetLiteral;
import com.google.devtools.build.lib.query2.engine.Uniquifier;
@@ -85,6 +86,7 @@ import com.google.devtools.build.skyframe.WalkableGraph.WalkableGraphFactory;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
@@ -93,6 +95,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -106,9 +109,11 @@ import javax.annotation.Nullable;
* even if the full closure isn't needed.
*/
public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
+
// 10k is likely a good balance between using batch efficiently and not blowing up memory.
// TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant.
private static final int BATCH_CALLBACK_SIZE = 10000;
+ private static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors();
protected WalkableGraph graph;
private Supplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier;
@@ -134,7 +139,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
private final ListeningExecutorService threadPool =
MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors(),
+ DEFAULT_THREAD_COUNT,
new ThreadFactoryBuilder().setNameFormat("GetPackages-%d").build()));
private RecursivePackageProviderBackedTargetPatternResolver resolver;
@@ -158,9 +163,11 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
int loadingPhaseThreads,
EventHandler eventHandler,
Set<Setting> settings,
- Iterable<QueryFunction> extraFunctions, String parserPrefix,
+ Iterable<QueryFunction> extraFunctions,
+ String parserPrefix,
WalkableGraphFactory graphFactory,
- List<String> universeScope, PathPackageLocator pkgPath) {
+ List<String> universeScope,
+ PathPackageLocator pkgPath) {
super(
keepGoing,
/*strictScope=*/ true,
@@ -268,21 +275,25 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
// This flushes the batched callback prior to constructing the QueryEvalResult in the unlikely
// case of a race between the original callback and the eventHandler.
final BatchStreamedCallback aggregator =
- new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE, createUniquifier());
+ new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE);
final AtomicBoolean empty = new AtomicBoolean(true);
+ Callback<Target> callbackWithEmptyCheck =
+ new Callback<Target>() {
+ @Override
+ public void process(Iterable<Target> partialResult)
+ throws QueryException, InterruptedException {
+ empty.compareAndSet(true, Iterables.isEmpty(partialResult));
+ aggregator.process(partialResult);
+ }
+ };
try (final AutoProfiler p = AutoProfiler.logged("evaluating query", LOG)) {
try {
- expr.eval(
- this,
- new Callback<Target>() {
- @Override
- public void process(Iterable<Target> partialResult)
- throws QueryException, InterruptedException {
- empty.compareAndSet(true, Iterables.isEmpty(partialResult));
- aggregator.process(partialResult);
- }
- });
+ if (canEvalConcurrently(expr)) {
+ expr.evalConcurrently(this, callbackWithEmptyCheck, threadPool);
+ } else {
+ expr.eval(this, callbackWithEmptyCheck);
+ }
} catch (QueryException e) {
throw new QueryException(e, expr);
}
@@ -293,17 +304,37 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
if (!keepGoing) {
// This case represents loading-phase errors reported during evaluation
// of target patterns that don't cause evaluation to fail per se.
- throw new QueryException("Evaluation of query \"" + expr
- + "\" failed due to BUILD file errors");
+ throw new QueryException(
+ "Evaluation of query \"" + expr + "\" failed due to BUILD file errors");
} else {
- eventHandler.handle(Event.warn("--keep_going specified, ignoring errors. "
- + "Results may be inaccurate"));
+ eventHandler.handle(
+ Event.warn("--keep_going specified, ignoring errors. " + "Results may be inaccurate"));
}
}
return new QueryEvalResult(!eventHandler.hasErrors(), empty.get());
}
+ // TODO(mschaller): This method and its use above are a quick temporary fix to a threadsafety
+ // problem that can happen when the operands of a BinaryOperatorExpression contain LetExpressions.
+ // Namely, concurrent reads and writes to AbstractBlazeQueryEnvironment#letBindings may fail or
+ // produce the wrong results.
+ // For now, this limits concurrent query expression evaluation to BinaryOperatorExpressions with
+ // TargetLiteral operands.
+ private static boolean canEvalConcurrently(QueryExpression expr) {
+ if (!(expr instanceof BinaryOperatorExpression)) {
+ return false;
+ }
+ BinaryOperatorExpression binaryExpr = (BinaryOperatorExpression) expr;
+ ImmutableList<QueryExpression> operands = binaryExpr.getOperands();
+ for (QueryExpression operand : operands) {
+ if (!(operand instanceof TargetLiteral)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private Map<Target, Collection<Target>> makeTargetsMap(Map<SkyKey, Iterable<SkyKey>> input) {
ImmutableMap.Builder<Target, Collection<Target>> result = ImmutableMap.builder();
@@ -447,18 +478,9 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
expr.eval(this, callback);
}
- private static Uniquifier<Target> uniquifier() {
- return new AbstractUniquifier<Target, Label>() {
- @Override
- protected Label extractKey(Target target) {
- return target.getLabel();
- }
- };
- }
-
@Override
public Uniquifier<Target> createUniquifier() {
- return uniquifier();
+ return new ConcurrentUniquifier();
}
@Override
@@ -841,6 +863,26 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
.build();
}
+ @ThreadSafe
+ private static class ConcurrentUniquifier implements Uniquifier<Target> {
+
+ // Note that setting initialCapacity to BATCH_CALLBACK_SIZE is not especially principled.
+ private final Set<Label> seen =
+ Collections.newSetFromMap(
+ new ConcurrentHashMap<Label, Boolean>(BATCH_CALLBACK_SIZE, .75f, DEFAULT_THREAD_COUNT));
+
+ @Override
+ public ImmutableList<Target> unique(Iterable<Target> newElements) {
+ ImmutableList.Builder<Target> builder = ImmutableList.builder();
+ for (Target newElement : newElements) {
+ if (seen.add(newElement.getLabel())) {
+ builder.add(newElement);
+ }
+ }
+ return builder.build();
+ }
+ }
+
/**
* Wraps a {@link Callback} and guarantees that all calls to the original will have at least
* {@code batchThreshold} {@link Target}s, except for the final such call.
@@ -850,36 +892,44 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
* <p>After this object's {@link #process} has been called for the last time, {#link
* #processLastPending} must be called to "flush" any remaining {@link Target}s through to the
* original.
+ *
+ * <p>This callback may be called from multiple threads concurrently. At most one thread will
+ * call the wrapped {@code callback} concurrently.
*/
+ @ThreadSafe
private static class BatchStreamedCallback implements Callback<Target> {
private final Callback<Target> callback;
- private final Uniquifier<Target> uniquifier;
+ private final Uniquifier<Target> uniquifier = new ConcurrentUniquifier();
+ private final Object pendingLock = new Object();
private List<Target> pending = new ArrayList<>();
private int batchThreshold;
- private BatchStreamedCallback(Callback<Target> callback, int batchThreshold,
- Uniquifier<Target> uniquifier) {
+ private BatchStreamedCallback(Callback<Target> callback, int batchThreshold) {
this.callback = callback;
this.batchThreshold = batchThreshold;
- this.uniquifier = uniquifier;
}
@Override
public void process(Iterable<Target> partialResult)
throws QueryException, InterruptedException {
- Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed");
- pending.addAll(uniquifier.unique(partialResult));
- if (pending.size() >= batchThreshold) {
- callback.process(pending);
- pending = new ArrayList<>();
+ ImmutableList<Target> uniquifiedTargets = uniquifier.unique(partialResult);
+ synchronized (pendingLock) {
+ Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed");
+ pending.addAll(uniquifiedTargets);
+ if (pending.size() >= batchThreshold) {
+ callback.process(pending);
+ pending = new ArrayList<>();
+ }
}
}
private void processLastPending() throws QueryException, InterruptedException {
- if (!pending.isEmpty()) {
- callback.process(pending);
- pending = null;
+ synchronized (pendingLock) {
+ if (!pending.isEmpty()) {
+ callback.process(pending);
+ pending = null;
+ }
}
}
}