aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java132
1 files changed, 91 insertions, 41 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 2accdcea63..30a7aa65a1 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -36,6 +36,7 @@ import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.cmdline.TargetParsingException;
import com.google.devtools.build.lib.cmdline.TargetPattern;
import com.google.devtools.build.lib.collect.CompactHashSet;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
import com.google.devtools.build.lib.graph.Digraph;
@@ -50,13 +51,13 @@ import com.google.devtools.build.lib.pkgcache.PathPackageLocator;
import com.google.devtools.build.lib.pkgcache.TargetPatternEvaluator;
import com.google.devtools.build.lib.profiler.AutoProfiler;
import com.google.devtools.build.lib.query2.engine.AllRdepsFunction;
+import com.google.devtools.build.lib.query2.engine.BinaryOperatorExpression;
import com.google.devtools.build.lib.query2.engine.Callback;
import com.google.devtools.build.lib.query2.engine.FunctionExpression;
import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractUniquifier;
import com.google.devtools.build.lib.query2.engine.RdepsFunction;
import com.google.devtools.build.lib.query2.engine.TargetLiteral;
import com.google.devtools.build.lib.query2.engine.Uniquifier;
@@ -85,6 +86,7 @@ import com.google.devtools.build.skyframe.WalkableGraph.WalkableGraphFactory;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
@@ -93,6 +95,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -106,9 +109,11 @@ import javax.annotation.Nullable;
* even if the full closure isn't needed.
*/
public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
+
// 10k is likely a good balance between using batch efficiently and not blowing up memory.
// TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant.
private static final int BATCH_CALLBACK_SIZE = 10000;
+ private static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors();
protected WalkableGraph graph;
private Supplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier;
@@ -134,7 +139,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
private final ListeningExecutorService threadPool =
MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors(),
+ DEFAULT_THREAD_COUNT,
new ThreadFactoryBuilder().setNameFormat("GetPackages-%d").build()));
private RecursivePackageProviderBackedTargetPatternResolver resolver;
@@ -158,9 +163,11 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
int loadingPhaseThreads,
EventHandler eventHandler,
Set<Setting> settings,
- Iterable<QueryFunction> extraFunctions, String parserPrefix,
+ Iterable<QueryFunction> extraFunctions,
+ String parserPrefix,
WalkableGraphFactory graphFactory,
- List<String> universeScope, PathPackageLocator pkgPath) {
+ List<String> universeScope,
+ PathPackageLocator pkgPath) {
super(
keepGoing,
/*strictScope=*/ true,
@@ -268,21 +275,25 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
// This flushes the batched callback prior to constructing the QueryEvalResult in the unlikely
// case of a race between the original callback and the eventHandler.
final BatchStreamedCallback aggregator =
- new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE, createUniquifier());
+ new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE);
final AtomicBoolean empty = new AtomicBoolean(true);
+ Callback<Target> callbackWithEmptyCheck =
+ new Callback<Target>() {
+ @Override
+ public void process(Iterable<Target> partialResult)
+ throws QueryException, InterruptedException {
+ empty.compareAndSet(true, Iterables.isEmpty(partialResult));
+ aggregator.process(partialResult);
+ }
+ };
try (final AutoProfiler p = AutoProfiler.logged("evaluating query", LOG)) {
try {
- expr.eval(
- this,
- new Callback<Target>() {
- @Override
- public void process(Iterable<Target> partialResult)
- throws QueryException, InterruptedException {
- empty.compareAndSet(true, Iterables.isEmpty(partialResult));
- aggregator.process(partialResult);
- }
- });
+ if (canEvalConcurrently(expr)) {
+ expr.evalConcurrently(this, callbackWithEmptyCheck, threadPool);
+ } else {
+ expr.eval(this, callbackWithEmptyCheck);
+ }
} catch (QueryException e) {
throw new QueryException(e, expr);
}
@@ -293,17 +304,37 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
if (!keepGoing) {
// This case represents loading-phase errors reported during evaluation
// of target patterns that don't cause evaluation to fail per se.
- throw new QueryException("Evaluation of query \"" + expr
- + "\" failed due to BUILD file errors");
+ throw new QueryException(
+ "Evaluation of query \"" + expr + "\" failed due to BUILD file errors");
} else {
- eventHandler.handle(Event.warn("--keep_going specified, ignoring errors. "
- + "Results may be inaccurate"));
+ eventHandler.handle(
+ Event.warn("--keep_going specified, ignoring errors. " + "Results may be inaccurate"));
}
}
return new QueryEvalResult(!eventHandler.hasErrors(), empty.get());
}
+ // TODO(mschaller): This method and its use above are a quick temporary fix to a threadsafety
+ // problem that can happen when the operands of a BinaryOperatorExpression contain LetExpressions.
+ // Namely, concurrent reads and writes to AbstractBlazeQueryEnvironment#letBindings may fail or
+ // produce the wrong results.
+ // For now, this limits concurrent query expression evaluation to BinaryOperatorExpressions with
+ // TargetLiteral operands.
+ private static boolean canEvalConcurrently(QueryExpression expr) {
+ if (!(expr instanceof BinaryOperatorExpression)) {
+ return false;
+ }
+ BinaryOperatorExpression binaryExpr = (BinaryOperatorExpression) expr;
+ ImmutableList<QueryExpression> operands = binaryExpr.getOperands();
+ for (QueryExpression operand : operands) {
+ if (!(operand instanceof TargetLiteral)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private Map<Target, Collection<Target>> makeTargetsMap(Map<SkyKey, Iterable<SkyKey>> input) {
ImmutableMap.Builder<Target, Collection<Target>> result = ImmutableMap.builder();
@@ -447,18 +478,9 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
expr.eval(this, callback);
}
- private static Uniquifier<Target> uniquifier() {
- return new AbstractUniquifier<Target, Label>() {
- @Override
- protected Label extractKey(Target target) {
- return target.getLabel();
- }
- };
- }
-
@Override
public Uniquifier<Target> createUniquifier() {
- return uniquifier();
+ return new ConcurrentUniquifier();
}
@Override
@@ -841,6 +863,26 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
.build();
}
+ @ThreadSafe
+ private static class ConcurrentUniquifier implements Uniquifier<Target> {
+
+ // Note that setting initialCapacity to BATCH_CALLBACK_SIZE is not especially principled.
+ private final Set<Label> seen =
+ Collections.newSetFromMap(
+ new ConcurrentHashMap<Label, Boolean>(BATCH_CALLBACK_SIZE, .75f, DEFAULT_THREAD_COUNT));
+
+ @Override
+ public ImmutableList<Target> unique(Iterable<Target> newElements) {
+ ImmutableList.Builder<Target> builder = ImmutableList.builder();
+ for (Target newElement : newElements) {
+ if (seen.add(newElement.getLabel())) {
+ builder.add(newElement);
+ }
+ }
+ return builder.build();
+ }
+ }
+
/**
* Wraps a {@link Callback} and guarantees that all calls to the original will have at least
* {@code batchThreshold} {@link Target}s, except for the final such call.
@@ -850,36 +892,44 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
* <p>After this object's {@link #process} has been called for the last time, {#link
* #processLastPending} must be called to "flush" any remaining {@link Target}s through to the
* original.
+ *
+ * <p>This callback may be called from multiple threads concurrently. At most one thread will
+ * call the wrapped {@code callback} concurrently.
*/
+ @ThreadSafe
private static class BatchStreamedCallback implements Callback<Target> {
private final Callback<Target> callback;
- private final Uniquifier<Target> uniquifier;
+ private final Uniquifier<Target> uniquifier = new ConcurrentUniquifier();
+ private final Object pendingLock = new Object();
private List<Target> pending = new ArrayList<>();
private int batchThreshold;
- private BatchStreamedCallback(Callback<Target> callback, int batchThreshold,
- Uniquifier<Target> uniquifier) {
+ private BatchStreamedCallback(Callback<Target> callback, int batchThreshold) {
this.callback = callback;
this.batchThreshold = batchThreshold;
- this.uniquifier = uniquifier;
}
@Override
public void process(Iterable<Target> partialResult)
throws QueryException, InterruptedException {
- Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed");
- pending.addAll(uniquifier.unique(partialResult));
- if (pending.size() >= batchThreshold) {
- callback.process(pending);
- pending = new ArrayList<>();
+ ImmutableList<Target> uniquifiedTargets = uniquifier.unique(partialResult);
+ synchronized (pendingLock) {
+ Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed");
+ pending.addAll(uniquifiedTargets);
+ if (pending.size() >= batchThreshold) {
+ callback.process(pending);
+ pending = new ArrayList<>();
+ }
}
}
private void processLastPending() throws QueryException, InterruptedException {
- if (!pending.isEmpty()) {
- callback.process(pending);
- pending = null;
+ synchronized (pendingLock) {
+ if (!pending.isEmpty()) {
+ callback.process(pending);
+ pending = null;
+ }
}
}
}