aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/query2
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java132
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java60
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java16
3 files changed, 163 insertions, 45 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 2accdcea63..30a7aa65a1 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -36,6 +36,7 @@ import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.cmdline.TargetParsingException;
import com.google.devtools.build.lib.cmdline.TargetPattern;
import com.google.devtools.build.lib.collect.CompactHashSet;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
import com.google.devtools.build.lib.graph.Digraph;
@@ -50,13 +51,13 @@ import com.google.devtools.build.lib.pkgcache.PathPackageLocator;
import com.google.devtools.build.lib.pkgcache.TargetPatternEvaluator;
import com.google.devtools.build.lib.profiler.AutoProfiler;
import com.google.devtools.build.lib.query2.engine.AllRdepsFunction;
+import com.google.devtools.build.lib.query2.engine.BinaryOperatorExpression;
import com.google.devtools.build.lib.query2.engine.Callback;
import com.google.devtools.build.lib.query2.engine.FunctionExpression;
import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractUniquifier;
import com.google.devtools.build.lib.query2.engine.RdepsFunction;
import com.google.devtools.build.lib.query2.engine.TargetLiteral;
import com.google.devtools.build.lib.query2.engine.Uniquifier;
@@ -85,6 +86,7 @@ import com.google.devtools.build.skyframe.WalkableGraph.WalkableGraphFactory;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
@@ -93,6 +95,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -106,9 +109,11 @@ import javax.annotation.Nullable;
* even if the full closure isn't needed.
*/
public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
+
// 10k is likely a good balance between using batch efficiently and not blowing up memory.
// TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant.
private static final int BATCH_CALLBACK_SIZE = 10000;
+ private static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors();
protected WalkableGraph graph;
private Supplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier;
@@ -134,7 +139,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
private final ListeningExecutorService threadPool =
MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors(),
+ DEFAULT_THREAD_COUNT,
new ThreadFactoryBuilder().setNameFormat("GetPackages-%d").build()));
private RecursivePackageProviderBackedTargetPatternResolver resolver;
@@ -158,9 +163,11 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
int loadingPhaseThreads,
EventHandler eventHandler,
Set<Setting> settings,
- Iterable<QueryFunction> extraFunctions, String parserPrefix,
+ Iterable<QueryFunction> extraFunctions,
+ String parserPrefix,
WalkableGraphFactory graphFactory,
- List<String> universeScope, PathPackageLocator pkgPath) {
+ List<String> universeScope,
+ PathPackageLocator pkgPath) {
super(
keepGoing,
/*strictScope=*/ true,
@@ -268,21 +275,25 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
// This flushes the batched callback prior to constructing the QueryEvalResult in the unlikely
// case of a race between the original callback and the eventHandler.
final BatchStreamedCallback aggregator =
- new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE, createUniquifier());
+ new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE);
final AtomicBoolean empty = new AtomicBoolean(true);
+ Callback<Target> callbackWithEmptyCheck =
+ new Callback<Target>() {
+ @Override
+ public void process(Iterable<Target> partialResult)
+ throws QueryException, InterruptedException {
+ empty.compareAndSet(true, Iterables.isEmpty(partialResult));
+ aggregator.process(partialResult);
+ }
+ };
try (final AutoProfiler p = AutoProfiler.logged("evaluating query", LOG)) {
try {
- expr.eval(
- this,
- new Callback<Target>() {
- @Override
- public void process(Iterable<Target> partialResult)
- throws QueryException, InterruptedException {
- empty.compareAndSet(true, Iterables.isEmpty(partialResult));
- aggregator.process(partialResult);
- }
- });
+ if (canEvalConcurrently(expr)) {
+ expr.evalConcurrently(this, callbackWithEmptyCheck, threadPool);
+ } else {
+ expr.eval(this, callbackWithEmptyCheck);
+ }
} catch (QueryException e) {
throw new QueryException(e, expr);
}
@@ -293,17 +304,37 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
if (!keepGoing) {
// This case represents loading-phase errors reported during evaluation
// of target patterns that don't cause evaluation to fail per se.
- throw new QueryException("Evaluation of query \"" + expr
- + "\" failed due to BUILD file errors");
+ throw new QueryException(
+ "Evaluation of query \"" + expr + "\" failed due to BUILD file errors");
} else {
- eventHandler.handle(Event.warn("--keep_going specified, ignoring errors. "
- + "Results may be inaccurate"));
+ eventHandler.handle(
+ Event.warn("--keep_going specified, ignoring errors. " + "Results may be inaccurate"));
}
}
return new QueryEvalResult(!eventHandler.hasErrors(), empty.get());
}
+ // TODO(mschaller): This method and its use above are a quick temporary fix to a threadsafety
+ // problem that can happen when the operands of a BinaryOperatorExpression contain LetExpressions.
+ // Namely, concurrent reads and writes to AbstractBlazeQueryEnvironment#letBindings may fail or
+ // produce the wrong results.
+ // For now, this limits concurrent query expression evaluation to BinaryOperatorExpressions with
+ // TargetLiteral operands.
+ private static boolean canEvalConcurrently(QueryExpression expr) {
+ if (!(expr instanceof BinaryOperatorExpression)) {
+ return false;
+ }
+ BinaryOperatorExpression binaryExpr = (BinaryOperatorExpression) expr;
+ ImmutableList<QueryExpression> operands = binaryExpr.getOperands();
+ for (QueryExpression operand : operands) {
+ if (!(operand instanceof TargetLiteral)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private Map<Target, Collection<Target>> makeTargetsMap(Map<SkyKey, Iterable<SkyKey>> input) {
ImmutableMap.Builder<Target, Collection<Target>> result = ImmutableMap.builder();
@@ -447,18 +478,9 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
expr.eval(this, callback);
}
- private static Uniquifier<Target> uniquifier() {
- return new AbstractUniquifier<Target, Label>() {
- @Override
- protected Label extractKey(Target target) {
- return target.getLabel();
- }
- };
- }
-
@Override
public Uniquifier<Target> createUniquifier() {
- return uniquifier();
+ return new ConcurrentUniquifier();
}
@Override
@@ -841,6 +863,26 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
.build();
}
+ @ThreadSafe
+ private static class ConcurrentUniquifier implements Uniquifier<Target> {
+
+ // Note that setting initialCapacity to BATCH_CALLBACK_SIZE is not especially principled.
+ private final Set<Label> seen =
+ Collections.newSetFromMap(
+ new ConcurrentHashMap<Label, Boolean>(BATCH_CALLBACK_SIZE, .75f, DEFAULT_THREAD_COUNT));
+
+ @Override
+ public ImmutableList<Target> unique(Iterable<Target> newElements) {
+ ImmutableList.Builder<Target> builder = ImmutableList.builder();
+ for (Target newElement : newElements) {
+ if (seen.add(newElement.getLabel())) {
+ builder.add(newElement);
+ }
+ }
+ return builder.build();
+ }
+ }
+
/**
* Wraps a {@link Callback} and guarantees that all calls to the original will have at least
* {@code batchThreshold} {@link Target}s, except for the final such call.
@@ -850,36 +892,44 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
* <p>After this object's {@link #process} has been called for the last time, {#link
* #processLastPending} must be called to "flush" any remaining {@link Target}s through to the
* original.
+ *
+ * <p>This callback may be called from multiple threads concurrently. At most one thread will
+ * call the wrapped {@code callback} concurrently.
*/
+ @ThreadSafe
private static class BatchStreamedCallback implements Callback<Target> {
private final Callback<Target> callback;
- private final Uniquifier<Target> uniquifier;
+ private final Uniquifier<Target> uniquifier = new ConcurrentUniquifier();
+ private final Object pendingLock = new Object();
private List<Target> pending = new ArrayList<>();
private int batchThreshold;
- private BatchStreamedCallback(Callback<Target> callback, int batchThreshold,
- Uniquifier<Target> uniquifier) {
+ private BatchStreamedCallback(Callback<Target> callback, int batchThreshold) {
this.callback = callback;
this.batchThreshold = batchThreshold;
- this.uniquifier = uniquifier;
}
@Override
public void process(Iterable<Target> partialResult)
throws QueryException, InterruptedException {
- Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed");
- pending.addAll(uniquifier.unique(partialResult));
- if (pending.size() >= batchThreshold) {
- callback.process(pending);
- pending = new ArrayList<>();
+ ImmutableList<Target> uniquifiedTargets = uniquifier.unique(partialResult);
+ synchronized (pendingLock) {
+ Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed");
+ pending.addAll(uniquifiedTargets);
+ if (pending.size() >= batchThreshold) {
+ callback.process(pending);
+ pending = new ArrayList<>();
+ }
}
}
private void processLastPending() throws QueryException, InterruptedException {
- if (!pending.isEmpty()) {
- callback.process(pending);
- pending = null;
+ synchronized (pendingLock) {
+ if (!pending.isEmpty()) {
+ callback.process(pending);
+ pending = null;
+ }
}
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
index 66555c7bee..c1d74c9873 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
@@ -14,12 +14,19 @@
package com.google.devtools.build.lib.query2.engine;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.query2.engine.Lexer.TokenKind;
import com.google.devtools.build.lib.util.Preconditions;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
/**
* A binary algebraic set operation.
@@ -33,7 +40,7 @@ import java.util.Set;
* | expr ('-' expr)+
* </pre>
*/
-class BinaryOperatorExpression extends QueryExpression {
+public class BinaryOperatorExpression extends QueryExpression {
private final Lexer.TokenKind operator; // ::= INTERSECT/CARET | UNION/PLUS | EXCEPT/MINUS
private final ImmutableList<QueryExpression> operands;
@@ -49,17 +56,62 @@ class BinaryOperatorExpression extends QueryExpression {
return operator;
}
- ImmutableList<QueryExpression> getOperands() {
+ public ImmutableList<QueryExpression> getOperands() {
return operands;
}
@Override
public <T> void eval(QueryEnvironment<T> env, Callback<T> callback)
throws QueryException, InterruptedException {
+ evalConcurrently(env, callback, MoreExecutors.newDirectExecutorService());
+ }
+ @Override
+ public <T> void evalConcurrently(
+ final QueryEnvironment<T> env,
+ final Callback<T> callback,
+ ListeningExecutorService executorService)
+ throws QueryException, InterruptedException {
if (operator == TokenKind.PLUS || operator == TokenKind.UNION) {
- for (QueryExpression operand : operands) {
- env.eval(operand, callback);
+ final AtomicReference<InterruptedException> interruptRef = new AtomicReference<>();
+ final AtomicReference<QueryException> queryExceptionRef = new AtomicReference<>();
+ ArrayList<ListenableFuture<?>> futures = new ArrayList<>(operands.size());
+ for (final QueryExpression operand : operands) {
+ // When executorService has an implementation that evaluates runnables in a non-serial
+ // order, like a fixedSizeThreadPool, the following code does not guarantee that operands'
+ // targets are emitted via the callback in the operands' order. And that's OK!
+ // BinaryOperatorExpression is a set operation. The query documentation states
+ // that set operations don't introduce any ordering constraints of their own.
+ //
+ // Ordering constraints for other kinds of expressions are enforced by the query
+ // environment.
+ futures.add(
+ executorService.submit(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ env.eval(operand, callback);
+ } catch (QueryException e) {
+ queryExceptionRef.compareAndSet(null, e);
+ } catch (InterruptedException e) {
+ interruptRef.compareAndSet(null, e);
+ }
+ }
+ }));
+ }
+ try {
+ Futures.allAsList(futures).get();
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(e);
+ }
+ InterruptedException interruptedExceptionIfAny = interruptRef.get();
+ if (interruptedExceptionIfAny != null) {
+ throw interruptedExceptionIfAny;
+ }
+ QueryException queryException = queryExceptionRef.get();
+ if (queryException != null) {
+ throw queryException;
}
return;
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
index b29071329b..e93b397299 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
@@ -13,6 +13,8 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
import java.util.Collection;
/**
@@ -71,6 +73,20 @@ public abstract class QueryExpression {
throws QueryException, InterruptedException;
/**
+ * Evaluates this query in the specified environment, as in {@link
+ * #eval(QueryEnvironment, Callback)}. If the query expression supports concurrent evaluation, it
+ * may employ {@code executorService}.
+ *
+ * <p>The caller must ensure that both {@code env} and {@code callback} are effectively
+ * threadsafe. The query expression may call their methods from multiple threads.
+ */
+ public <T> void evalConcurrently(
+ QueryEnvironment<T> env, Callback<T> callback, ListeningExecutorService executorService)
+ throws QueryException, InterruptedException {
+ this.eval(env, callback);
+ }
+
+ /**
* Collects all target patterns that are referenced anywhere within this query expression and adds
* them to the given collection, which must be mutable.
*/