aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/query2
diff options
context:
space:
mode:
authorGravatar Mark Schaller <mschaller@google.com>2016-06-21 22:51:14 +0000
committerGravatar Philipp Wollermann <philwo@google.com>2016-06-22 10:48:17 +0000
commit20c75010f15f130d6107f98cdbbfad004f6bd549 (patch)
tree7db7791eb33e05ea30d05f9259e375a69ddfa077 /src/main/java/com/google/devtools/build/lib/query2
parent4b801f2abf445e7c2e82784cf3f44e9239b52d5c (diff)
Parallelize (some of) BinaryOperatorExpression
Adds evalConcurrently to QueryExpression so that expression implementations that support concurrent evaluation can do so using the supplied executor service. Implements concurrent evaluation for the PLUS/UNION cases of BinaryOperatorExpression. Because evalConcurrently requires its callback to be threadsafe, but the callback passed to evaluateQuery may only be called by one thread at a time, this change makes the BatchStreamedCallback constructed by SkyQueryEnvironment threadsafe, including its uniquifier. However, there is a thread-safety problem when the operands of BinaryOperatorExpression are LetExpressions, because their evaluation involves mutating state in the query environment. A future change will fix that. For now, concurrent evaluation is only attempted when the query expression is a BinaryOperatorExpression and all its operands are target literals. -- MOS_MIGRATED_REVID=125505247
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java132
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java60
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java16
3 files changed, 163 insertions, 45 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 2accdcea63..30a7aa65a1 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -36,6 +36,7 @@ import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.cmdline.TargetParsingException;
import com.google.devtools.build.lib.cmdline.TargetPattern;
import com.google.devtools.build.lib.collect.CompactHashSet;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
import com.google.devtools.build.lib.graph.Digraph;
@@ -50,13 +51,13 @@ import com.google.devtools.build.lib.pkgcache.PathPackageLocator;
import com.google.devtools.build.lib.pkgcache.TargetPatternEvaluator;
import com.google.devtools.build.lib.profiler.AutoProfiler;
import com.google.devtools.build.lib.query2.engine.AllRdepsFunction;
+import com.google.devtools.build.lib.query2.engine.BinaryOperatorExpression;
import com.google.devtools.build.lib.query2.engine.Callback;
import com.google.devtools.build.lib.query2.engine.FunctionExpression;
import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractUniquifier;
import com.google.devtools.build.lib.query2.engine.RdepsFunction;
import com.google.devtools.build.lib.query2.engine.TargetLiteral;
import com.google.devtools.build.lib.query2.engine.Uniquifier;
@@ -85,6 +86,7 @@ import com.google.devtools.build.skyframe.WalkableGraph.WalkableGraphFactory;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
@@ -93,6 +95,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
@@ -106,9 +109,11 @@ import javax.annotation.Nullable;
* even if the full closure isn't needed.
*/
public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
+
// 10k is likely a good balance between using batch efficiently and not blowing up memory.
// TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant.
private static final int BATCH_CALLBACK_SIZE = 10000;
+ private static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors();
protected WalkableGraph graph;
private Supplier<ImmutableSet<PathFragment>> blacklistPatternsSupplier;
@@ -134,7 +139,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
private final ListeningExecutorService threadPool =
MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
- Runtime.getRuntime().availableProcessors(),
+ DEFAULT_THREAD_COUNT,
new ThreadFactoryBuilder().setNameFormat("GetPackages-%d").build()));
private RecursivePackageProviderBackedTargetPatternResolver resolver;
@@ -158,9 +163,11 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
int loadingPhaseThreads,
EventHandler eventHandler,
Set<Setting> settings,
- Iterable<QueryFunction> extraFunctions, String parserPrefix,
+ Iterable<QueryFunction> extraFunctions,
+ String parserPrefix,
WalkableGraphFactory graphFactory,
- List<String> universeScope, PathPackageLocator pkgPath) {
+ List<String> universeScope,
+ PathPackageLocator pkgPath) {
super(
keepGoing,
/*strictScope=*/ true,
@@ -268,21 +275,25 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
// This flushes the batched callback prior to constructing the QueryEvalResult in the unlikely
// case of a race between the original callback and the eventHandler.
final BatchStreamedCallback aggregator =
- new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE, createUniquifier());
+ new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE);
final AtomicBoolean empty = new AtomicBoolean(true);
+ Callback<Target> callbackWithEmptyCheck =
+ new Callback<Target>() {
+ @Override
+ public void process(Iterable<Target> partialResult)
+ throws QueryException, InterruptedException {
+ empty.compareAndSet(true, Iterables.isEmpty(partialResult));
+ aggregator.process(partialResult);
+ }
+ };
try (final AutoProfiler p = AutoProfiler.logged("evaluating query", LOG)) {
try {
- expr.eval(
- this,
- new Callback<Target>() {
- @Override
- public void process(Iterable<Target> partialResult)
- throws QueryException, InterruptedException {
- empty.compareAndSet(true, Iterables.isEmpty(partialResult));
- aggregator.process(partialResult);
- }
- });
+ if (canEvalConcurrently(expr)) {
+ expr.evalConcurrently(this, callbackWithEmptyCheck, threadPool);
+ } else {
+ expr.eval(this, callbackWithEmptyCheck);
+ }
} catch (QueryException e) {
throw new QueryException(e, expr);
}
@@ -293,17 +304,37 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
if (!keepGoing) {
// This case represents loading-phase errors reported during evaluation
// of target patterns that don't cause evaluation to fail per se.
- throw new QueryException("Evaluation of query \"" + expr
- + "\" failed due to BUILD file errors");
+ throw new QueryException(
+ "Evaluation of query \"" + expr + "\" failed due to BUILD file errors");
} else {
- eventHandler.handle(Event.warn("--keep_going specified, ignoring errors. "
- + "Results may be inaccurate"));
+ eventHandler.handle(
+ Event.warn("--keep_going specified, ignoring errors. " + "Results may be inaccurate"));
}
}
return new QueryEvalResult(!eventHandler.hasErrors(), empty.get());
}
+ // TODO(mschaller): This method and its use above are a quick temporary fix to a threadsafety
+ // problem that can happen when the operands of a BinaryOperatorExpression contain LetExpressions.
+ // Namely, concurrent reads and writes to AbstractBlazeQueryEnvironment#letBindings may fail or
+ // produce the wrong results.
+ // For now, this limits concurrent query expression evaluation to BinaryOperatorExpressions with
+ // TargetLiteral operands.
+ private static boolean canEvalConcurrently(QueryExpression expr) {
+ if (!(expr instanceof BinaryOperatorExpression)) {
+ return false;
+ }
+ BinaryOperatorExpression binaryExpr = (BinaryOperatorExpression) expr;
+ ImmutableList<QueryExpression> operands = binaryExpr.getOperands();
+ for (QueryExpression operand : operands) {
+ if (!(operand instanceof TargetLiteral)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
private Map<Target, Collection<Target>> makeTargetsMap(Map<SkyKey, Iterable<SkyKey>> input) {
ImmutableMap.Builder<Target, Collection<Target>> result = ImmutableMap.builder();
@@ -447,18 +478,9 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
expr.eval(this, callback);
}
- private static Uniquifier<Target> uniquifier() {
- return new AbstractUniquifier<Target, Label>() {
- @Override
- protected Label extractKey(Target target) {
- return target.getLabel();
- }
- };
- }
-
@Override
public Uniquifier<Target> createUniquifier() {
- return uniquifier();
+ return new ConcurrentUniquifier();
}
@Override
@@ -841,6 +863,26 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
.build();
}
+ @ThreadSafe
+ private static class ConcurrentUniquifier implements Uniquifier<Target> {
+
+ // Note that setting initialCapacity to BATCH_CALLBACK_SIZE is not especially principled.
+ private final Set<Label> seen =
+ Collections.newSetFromMap(
+ new ConcurrentHashMap<Label, Boolean>(BATCH_CALLBACK_SIZE, .75f, DEFAULT_THREAD_COUNT));
+
+ @Override
+ public ImmutableList<Target> unique(Iterable<Target> newElements) {
+ ImmutableList.Builder<Target> builder = ImmutableList.builder();
+ for (Target newElement : newElements) {
+ if (seen.add(newElement.getLabel())) {
+ builder.add(newElement);
+ }
+ }
+ return builder.build();
+ }
+ }
+
/**
* Wraps a {@link Callback} and guarantees that all calls to the original will have at least
* {@code batchThreshold} {@link Target}s, except for the final such call.
@@ -850,36 +892,44 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
* <p>After this object's {@link #process} has been called for the last time, {#link
* #processLastPending} must be called to "flush" any remaining {@link Target}s through to the
* original.
+ *
+ * <p>This callback may be called from multiple threads concurrently. At most one thread will
+ * call the wrapped {@code callback} concurrently.
*/
+ @ThreadSafe
private static class BatchStreamedCallback implements Callback<Target> {
private final Callback<Target> callback;
- private final Uniquifier<Target> uniquifier;
+ private final Uniquifier<Target> uniquifier = new ConcurrentUniquifier();
+ private final Object pendingLock = new Object();
private List<Target> pending = new ArrayList<>();
private int batchThreshold;
- private BatchStreamedCallback(Callback<Target> callback, int batchThreshold,
- Uniquifier<Target> uniquifier) {
+ private BatchStreamedCallback(Callback<Target> callback, int batchThreshold) {
this.callback = callback;
this.batchThreshold = batchThreshold;
- this.uniquifier = uniquifier;
}
@Override
public void process(Iterable<Target> partialResult)
throws QueryException, InterruptedException {
- Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed");
- pending.addAll(uniquifier.unique(partialResult));
- if (pending.size() >= batchThreshold) {
- callback.process(pending);
- pending = new ArrayList<>();
+ ImmutableList<Target> uniquifiedTargets = uniquifier.unique(partialResult);
+ synchronized (pendingLock) {
+ Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed");
+ pending.addAll(uniquifiedTargets);
+ if (pending.size() >= batchThreshold) {
+ callback.process(pending);
+ pending = new ArrayList<>();
+ }
}
}
private void processLastPending() throws QueryException, InterruptedException {
- if (!pending.isEmpty()) {
- callback.process(pending);
- pending = null;
+ synchronized (pendingLock) {
+ if (!pending.isEmpty()) {
+ callback.process(pending);
+ pending = null;
+ }
}
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
index 66555c7bee..c1d74c9873 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
@@ -14,12 +14,19 @@
package com.google.devtools.build.lib.query2.engine;
import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.query2.engine.Lexer.TokenKind;
import com.google.devtools.build.lib.util.Preconditions;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
/**
* A binary algebraic set operation.
@@ -33,7 +40,7 @@ import java.util.Set;
* | expr ('-' expr)+
* </pre>
*/
-class BinaryOperatorExpression extends QueryExpression {
+public class BinaryOperatorExpression extends QueryExpression {
private final Lexer.TokenKind operator; // ::= INTERSECT/CARET | UNION/PLUS | EXCEPT/MINUS
private final ImmutableList<QueryExpression> operands;
@@ -49,17 +56,62 @@ class BinaryOperatorExpression extends QueryExpression {
return operator;
}
- ImmutableList<QueryExpression> getOperands() {
+ public ImmutableList<QueryExpression> getOperands() {
return operands;
}
@Override
public <T> void eval(QueryEnvironment<T> env, Callback<T> callback)
throws QueryException, InterruptedException {
+ evalConcurrently(env, callback, MoreExecutors.newDirectExecutorService());
+ }
+ @Override
+ public <T> void evalConcurrently(
+ final QueryEnvironment<T> env,
+ final Callback<T> callback,
+ ListeningExecutorService executorService)
+ throws QueryException, InterruptedException {
if (operator == TokenKind.PLUS || operator == TokenKind.UNION) {
- for (QueryExpression operand : operands) {
- env.eval(operand, callback);
+ final AtomicReference<InterruptedException> interruptRef = new AtomicReference<>();
+ final AtomicReference<QueryException> queryExceptionRef = new AtomicReference<>();
+ ArrayList<ListenableFuture<?>> futures = new ArrayList<>(operands.size());
+ for (final QueryExpression operand : operands) {
+ // When executorService has an implementation that evaluates runnables in a non-serial
+ // order, like a fixedSizeThreadPool, the following code does not guarantee that operands'
+ // targets are emitted via the callback in the operands' order. And that's OK!
+ // BinaryOperatorExpression is a set operation. The query documentation states
+ // that set operations don't introduce any ordering constraints of their own.
+ //
+ // Ordering constraints for other kinds of expressions are enforced by the query
+ // environment.
+ futures.add(
+ executorService.submit(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ env.eval(operand, callback);
+ } catch (QueryException e) {
+ queryExceptionRef.compareAndSet(null, e);
+ } catch (InterruptedException e) {
+ interruptRef.compareAndSet(null, e);
+ }
+ }
+ }));
+ }
+ try {
+ Futures.allAsList(futures).get();
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(e);
+ }
+ InterruptedException interruptedExceptionIfAny = interruptRef.get();
+ if (interruptedExceptionIfAny != null) {
+ throw interruptedExceptionIfAny;
+ }
+ QueryException queryException = queryExceptionRef.get();
+ if (queryException != null) {
+ throw queryException;
}
return;
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
index b29071329b..e93b397299 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java
@@ -13,6 +13,8 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
import java.util.Collection;
/**
@@ -71,6 +73,20 @@ public abstract class QueryExpression {
throws QueryException, InterruptedException;
/**
+ * Evaluates this query in the specified environment, as in {@link
+ * #eval(QueryEnvironment, Callback)}. If the query expression supports concurrent evaluation, it
+ * may employ {@code executorService}.
+ *
+ * <p>The caller must ensure that both {@code env} and {@code callback} are effectively
+ * threadsafe. The query expression may call their methods from multiple threads.
+ */
+ public <T> void evalConcurrently(
+ QueryEnvironment<T> env, Callback<T> callback, ListeningExecutorService executorService)
+ throws QueryException, InterruptedException {
+ this.eval(env, callback);
+ }
+
+ /**
* Collects all target patterns that are referenced anywhere within this query expression and adds
* them to the given collection, which must be mutable.
*/