aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java116
1 files changed, 87 insertions, 29 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index ac3e2a1be6..5f7447b198 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -18,12 +18,15 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.collect.CompactHashSet;
import com.google.devtools.build.lib.concurrent.MoreFutures;
+import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.packages.Target;
import com.google.devtools.build.lib.query2.engine.Callback;
@@ -69,13 +72,14 @@ class ParallelSkyQueryUtils {
QueryExpression expression,
VariableContext<Target> context,
ThreadSafeCallback<Target> callback,
- ForkJoinPool forkJoinPool)
+ ForkJoinPool forkJoinPool,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore)
throws QueryException, InterruptedException {
env.eval(
expression,
context,
new SkyKeyBFSVisitorCallback(
- new AllRdepsUnboundedVisitor.Factory(env, callback, forkJoinPool)));
+ new AllRdepsUnboundedVisitor.Factory(env, callback, forkJoinPool, packageSemaphore)));
}
/** Specialized parallel variant of {@link SkyQueryEnvironment#getRBuildFiles}. */
@@ -83,24 +87,29 @@ class ParallelSkyQueryUtils {
SkyQueryEnvironment env,
Collection<PathFragment> fileIdentifiers,
ThreadSafeCallback<Target> callback,
- ForkJoinPool forkJoinPool)
+ ForkJoinPool forkJoinPool,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore)
throws QueryException, InterruptedException {
ThreadSafeUniquifier<SkyKey> keyUniquifier = env.createSkyKeyUniquifier();
- RBuildFilesVisitor visitor = new RBuildFilesVisitor(env, forkJoinPool, keyUniquifier, callback);
+ RBuildFilesVisitor visitor =
+ new RBuildFilesVisitor(env, forkJoinPool, keyUniquifier, callback, packageSemaphore);
visitor.visitAndWaitForCompletion(env.getSkyKeysForFileFragments(fileIdentifiers));
}
/** A helper class that computes 'rbuildfiles(<blah>)' via BFS. */
private static class RBuildFilesVisitor extends AbstractSkyKeyBFSVisitor<SkyKey> {
private final SkyQueryEnvironment env;
+ private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
private RBuildFilesVisitor(
SkyQueryEnvironment env,
ForkJoinPool forkJoinPool,
ThreadSafeUniquifier<SkyKey> uniquifier,
- Callback<Target> callback) {
+ Callback<Target> callback,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore) {
super(forkJoinPool, uniquifier, callback);
this.env = env;
+ this.packageSemaphore = packageSemaphore;
}
@Override
@@ -125,10 +134,21 @@ class ParallelSkyQueryUtils {
}
@Override
- protected Iterable<Target> getTargetsToAddToResult(Iterable<SkyKey> keysToUseForResult)
- throws InterruptedException {
- return SkyQueryEnvironment.getBuildFilesForPackageValues(
- env.graph.getSuccessfulValues(keysToUseForResult).values());
+ protected void processResultantTargets(
+ Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
+ throws QueryException, InterruptedException {
+ Set<PackageIdentifier> pkgIdsNeededForResult =
+ ImmutableSet.copyOf(
+ Iterables.transform(
+ keysToUseForResult,
+ SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER));
+ packageSemaphore.acquireAll(pkgIdsNeededForResult);
+ try {
+ callback.process(SkyQueryEnvironment.getBuildFilesForPackageValues(
+ env.graph.getSuccessfulValues(keysToUseForResult).values()));
+ } finally {
+ packageSemaphore.releaseAll(pkgIdsNeededForResult);
+ }
}
@Override
@@ -152,14 +172,17 @@ class ParallelSkyQueryUtils {
private static class AllRdepsUnboundedVisitor
extends AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> {
private final SkyQueryEnvironment env;
+ private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
private AllRdepsUnboundedVisitor(
SkyQueryEnvironment env,
ForkJoinPool forkJoinPool,
ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier,
- ThreadSafeCallback<Target> callback) {
+ ThreadSafeCallback<Target> callback,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore) {
super(forkJoinPool, uniquifier, callback);
this.env = env;
+ this.packageSemaphore = packageSemaphore;
}
/**
@@ -174,20 +197,24 @@ class ParallelSkyQueryUtils {
private final ForkJoinPool forkJoinPool;
private final ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier;
private final ThreadSafeCallback<Target> callback;
+ private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
private Factory(
SkyQueryEnvironment env,
ThreadSafeCallback<Target> callback,
- ForkJoinPool forkJoinPool) {
+ ForkJoinPool forkJoinPool,
+ MultisetSemaphore<PackageIdentifier> packageSemaphore) {
this.env = env;
this.forkJoinPool = forkJoinPool;
this.uniquifier = env.createReverseDepSkyKeyUniquifier();
this.callback = callback;
+ this.packageSemaphore = packageSemaphore;
}
@Override
public AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> create() {
- return new AllRdepsUnboundedVisitor(env, forkJoinPool, uniquifier, callback);
+ return new AllRdepsUnboundedVisitor(
+ env, forkJoinPool, uniquifier, callback, packageSemaphore);
}
}
@@ -213,13 +240,27 @@ class ParallelSkyQueryUtils {
reverseDepsMap.get(reverseDepPair.first).add(reverseDepPair.second);
}
- // Filter out disallowed deps. We cannot defer the targetification any further as we do not
- // want to retrieve the rdeps of unwanted nodes (targets).
- if (!reverseDepsMap.isEmpty()) {
- Collection<Target> filteredTargets =
- env.filterRawReverseDepsOfTransitiveTraversalKeys(reverseDepsMap);
- filteredKeys.addAll(
- Collections2.transform(filteredTargets, SkyQueryEnvironment.TARGET_TO_SKY_KEY));
+ Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
+ env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepsMap.values()));
+ Set<PackageIdentifier> pkgIdsNeededForTargetification =
+ ImmutableSet.copyOf(
+ Iterables.transform(
+ packageKeyToTargetKeyMap.keySet(),
+ SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER));
+ packageSemaphore.acquireAll(pkgIdsNeededForTargetification);
+
+ try {
+ // Filter out disallowed deps. We cannot defer the targetification any further as we do not
+ // want to retrieve the rdeps of unwanted nodes (targets).
+ if (!reverseDepsMap.isEmpty()) {
+ Collection<Target> filteredTargets =
+ env.filterRawReverseDepsOfTransitiveTraversalKeys(
+ reverseDepsMap, packageKeyToTargetKeyMap);
+ filteredKeys.addAll(
+ Collections2.transform(filteredTargets, SkyQueryEnvironment.TARGET_TO_SKY_KEY));
+ }
+ } finally {
+ packageSemaphore.releaseAll(pkgIdsNeededForTargetification);
}
// Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next
@@ -252,9 +293,23 @@ class ParallelSkyQueryUtils {
}
@Override
- protected Iterable<Target> getTargetsToAddToResult(Iterable<SkyKey> keysToUseForResult)
- throws InterruptedException {
- return env.makeTargetsFromSkyKeys(keysToUseForResult).values();
+ protected void processResultantTargets(
+ Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
+ throws QueryException, InterruptedException {
+ Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap =
+ env.makePackageKeyToTargetKeyMap(keysToUseForResult);
+ Set<PackageIdentifier> pkgIdsNeededForResult =
+ ImmutableSet.copyOf(
+ Iterables.transform(
+ packageKeyToTargetKeyMap.keySet(),
+ SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER));
+ packageSemaphore.acquireAll(pkgIdsNeededForResult);
+ try {
+ callback.process(
+ env.makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap).values());
+ } finally {
+ packageSemaphore.releaseAll(pkgIdsNeededForResult);
+ }
}
@Override
@@ -294,7 +349,7 @@ class ParallelSkyQueryUtils {
/**
* A helper class for performing a custom BFS visitation on the Skyframe graph, using {@link
- * ForkJoinQuiescingExecutor}.
+ * ForkJoinPool}.
*
* <p>The choice of {@link ForkJoinPool} over, say, AbstractQueueVisitor backed by a
* ThreadPoolExecutor, is very deliberate. {@link SkyKeyBFSVisitorCallback#process} kicks off a
@@ -310,7 +365,9 @@ class ParallelSkyQueryUtils {
private static final int VISIT_BATCH_SIZE = 10000;
private AbstractSkyKeyBFSVisitor(
- ForkJoinPool forkJoinPool, ThreadSafeUniquifier<T> uniquifier, Callback<Target> callback) {
+ ForkJoinPool forkJoinPool,
+ ThreadSafeUniquifier<T> uniquifier,
+ Callback<Target> callback) {
this.forkJoinPool = forkJoinPool;
this.uniquifier = uniquifier;
this.callback = callback;
@@ -402,7 +459,7 @@ class ParallelSkyQueryUtils {
@Override
protected void computeImpl() throws QueryException, InterruptedException {
- callback.process(getTargetsToAddToResult(keysToUseForResult));
+ processResultantTargets(keysToUseForResult, callback);
}
}
@@ -425,11 +482,12 @@ class ParallelSkyQueryUtils {
}
/**
- * Gets the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s in the
- * full visitation.
+ * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s
+ * in the full visitation to the given {@link Callback}.
*/
- protected abstract Iterable<Target> getTargetsToAddToResult(
- Iterable<SkyKey> keysToUseForResult) throws InterruptedException;
+ protected abstract void processResultantTargets(
+ Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
+ throws QueryException, InterruptedException;
/** Gets the {@link Visit} representing the local visitation of the given {@code values}. */
protected abstract Visit getVisitResult(Iterable<T> values) throws InterruptedException;