From 41b541754cb4176a8b39426c00bffb27cc9f57b6 Mon Sep 17 00:00:00 2001 From: Nathan Harmata Date: Thu, 10 Nov 2016 18:54:09 +0000 Subject: Add a mechanism for bounding the number of Packages SkyQueryEnvironment's expensive parallel operations can operate on at once. -- MOS_MIGRATED_REVID=138779172 --- .../build/lib/query2/ParallelSkyQueryUtils.java | 116 +++++++++++++++------ .../build/lib/query2/SkyQueryEnvironment.java | 58 +++++++++-- ...PackageProviderBackedTargetPatternResolver.java | 40 ++++--- .../build/lib/skyframe/TargetPatternFunction.java | 12 ++- 4 files changed, 170 insertions(+), 56 deletions(-) (limited to 'src/main/java/com/google/devtools/build/lib') diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java index ac3e2a1be6..5f7447b198 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java @@ -18,12 +18,15 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.collect.CompactHashSet; import com.google.devtools.build.lib.concurrent.MoreFutures; +import com.google.devtools.build.lib.concurrent.MultisetSemaphore; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.query2.engine.Callback; @@ -69,13 +72,14 @@ class ParallelSkyQueryUtils { QueryExpression expression, VariableContext context, ThreadSafeCallback callback, - ForkJoinPool forkJoinPool) + ForkJoinPool forkJoinPool, + MultisetSemaphore packageSemaphore) throws QueryException, InterruptedException { env.eval( expression, context, new SkyKeyBFSVisitorCallback( - new AllRdepsUnboundedVisitor.Factory(env, callback, forkJoinPool))); + new AllRdepsUnboundedVisitor.Factory(env, callback, forkJoinPool, packageSemaphore))); } /** Specialized parallel variant of {@link SkyQueryEnvironment#getRBuildFiles}. */ @@ -83,24 +87,29 @@ class ParallelSkyQueryUtils { SkyQueryEnvironment env, Collection fileIdentifiers, ThreadSafeCallback callback, - ForkJoinPool forkJoinPool) + ForkJoinPool forkJoinPool, + MultisetSemaphore packageSemaphore) throws QueryException, InterruptedException { ThreadSafeUniquifier keyUniquifier = env.createSkyKeyUniquifier(); - RBuildFilesVisitor visitor = new RBuildFilesVisitor(env, forkJoinPool, keyUniquifier, callback); + RBuildFilesVisitor visitor = + new RBuildFilesVisitor(env, forkJoinPool, keyUniquifier, callback, packageSemaphore); visitor.visitAndWaitForCompletion(env.getSkyKeysForFileFragments(fileIdentifiers)); } /** A helper class that computes 'rbuildfiles()' via BFS. */ private static class RBuildFilesVisitor extends AbstractSkyKeyBFSVisitor { private final SkyQueryEnvironment env; + private final MultisetSemaphore packageSemaphore; private RBuildFilesVisitor( SkyQueryEnvironment env, ForkJoinPool forkJoinPool, ThreadSafeUniquifier uniquifier, - Callback callback) { + Callback callback, + MultisetSemaphore packageSemaphore) { super(forkJoinPool, uniquifier, callback); this.env = env; + this.packageSemaphore = packageSemaphore; } @Override @@ -125,10 +134,21 @@ class ParallelSkyQueryUtils { } @Override - protected Iterable getTargetsToAddToResult(Iterable keysToUseForResult) - throws InterruptedException { - return SkyQueryEnvironment.getBuildFilesForPackageValues( - env.graph.getSuccessfulValues(keysToUseForResult).values()); + protected void processResultantTargets( + Iterable keysToUseForResult, Callback callback) + throws QueryException, InterruptedException { + Set pkgIdsNeededForResult = + ImmutableSet.copyOf( + Iterables.transform( + keysToUseForResult, + SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)); + packageSemaphore.acquireAll(pkgIdsNeededForResult); + try { + callback.process(SkyQueryEnvironment.getBuildFilesForPackageValues( + env.graph.getSuccessfulValues(keysToUseForResult).values())); + } finally { + packageSemaphore.releaseAll(pkgIdsNeededForResult); + } } @Override @@ -152,14 +172,17 @@ class ParallelSkyQueryUtils { private static class AllRdepsUnboundedVisitor extends AbstractSkyKeyBFSVisitor> { private final SkyQueryEnvironment env; + private final MultisetSemaphore packageSemaphore; private AllRdepsUnboundedVisitor( SkyQueryEnvironment env, ForkJoinPool forkJoinPool, ThreadSafeUniquifier> uniquifier, - ThreadSafeCallback callback) { + ThreadSafeCallback callback, + MultisetSemaphore packageSemaphore) { super(forkJoinPool, uniquifier, callback); this.env = env; + this.packageSemaphore = packageSemaphore; } /** @@ -174,20 +197,24 @@ class ParallelSkyQueryUtils { private final ForkJoinPool forkJoinPool; private final ThreadSafeUniquifier> uniquifier; private final ThreadSafeCallback callback; + private final MultisetSemaphore packageSemaphore; private Factory( SkyQueryEnvironment env, ThreadSafeCallback callback, - ForkJoinPool forkJoinPool) { + ForkJoinPool forkJoinPool, + MultisetSemaphore packageSemaphore) { this.env = env; this.forkJoinPool = forkJoinPool; this.uniquifier = env.createReverseDepSkyKeyUniquifier(); this.callback = callback; + this.packageSemaphore = packageSemaphore; } @Override public AbstractSkyKeyBFSVisitor> create() { - return new AllRdepsUnboundedVisitor(env, forkJoinPool, uniquifier, callback); + return new AllRdepsUnboundedVisitor( + env, forkJoinPool, uniquifier, callback, packageSemaphore); } } @@ -213,13 +240,27 @@ class ParallelSkyQueryUtils { reverseDepsMap.get(reverseDepPair.first).add(reverseDepPair.second); } - // Filter out disallowed deps. We cannot defer the targetification any further as we do not - // want to retrieve the rdeps of unwanted nodes (targets). - if (!reverseDepsMap.isEmpty()) { - Collection filteredTargets = - env.filterRawReverseDepsOfTransitiveTraversalKeys(reverseDepsMap); - filteredKeys.addAll( - Collections2.transform(filteredTargets, SkyQueryEnvironment.TARGET_TO_SKY_KEY)); + Multimap packageKeyToTargetKeyMap = + env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepsMap.values())); + Set pkgIdsNeededForTargetification = + ImmutableSet.copyOf( + Iterables.transform( + packageKeyToTargetKeyMap.keySet(), + SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)); + packageSemaphore.acquireAll(pkgIdsNeededForTargetification); + + try { + // Filter out disallowed deps. We cannot defer the targetification any further as we do not + // want to retrieve the rdeps of unwanted nodes (targets). + if (!reverseDepsMap.isEmpty()) { + Collection filteredTargets = + env.filterRawReverseDepsOfTransitiveTraversalKeys( + reverseDepsMap, packageKeyToTargetKeyMap); + filteredKeys.addAll( + Collections2.transform(filteredTargets, SkyQueryEnvironment.TARGET_TO_SKY_KEY)); + } + } finally { + packageSemaphore.releaseAll(pkgIdsNeededForTargetification); } // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next @@ -252,9 +293,23 @@ class ParallelSkyQueryUtils { } @Override - protected Iterable getTargetsToAddToResult(Iterable keysToUseForResult) - throws InterruptedException { - return env.makeTargetsFromSkyKeys(keysToUseForResult).values(); + protected void processResultantTargets( + Iterable keysToUseForResult, Callback callback) + throws QueryException, InterruptedException { + Multimap packageKeyToTargetKeyMap = + env.makePackageKeyToTargetKeyMap(keysToUseForResult); + Set pkgIdsNeededForResult = + ImmutableSet.copyOf( + Iterables.transform( + packageKeyToTargetKeyMap.keySet(), + SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)); + packageSemaphore.acquireAll(pkgIdsNeededForResult); + try { + callback.process( + env.makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap).values()); + } finally { + packageSemaphore.releaseAll(pkgIdsNeededForResult); + } } @Override @@ -294,7 +349,7 @@ class ParallelSkyQueryUtils { /** * A helper class for performing a custom BFS visitation on the Skyframe graph, using {@link - * ForkJoinQuiescingExecutor}. + * ForkJoinPool}. * *

The choice of {@link ForkJoinPool} over, say, AbstractQueueVisitor backed by a * ThreadPoolExecutor, is very deliberate. {@link SkyKeyBFSVisitorCallback#process} kicks off a @@ -310,7 +365,9 @@ class ParallelSkyQueryUtils { private static final int VISIT_BATCH_SIZE = 10000; private AbstractSkyKeyBFSVisitor( - ForkJoinPool forkJoinPool, ThreadSafeUniquifier uniquifier, Callback callback) { + ForkJoinPool forkJoinPool, + ThreadSafeUniquifier uniquifier, + Callback callback) { this.forkJoinPool = forkJoinPool; this.uniquifier = uniquifier; this.callback = callback; @@ -402,7 +459,7 @@ class ParallelSkyQueryUtils { @Override protected void computeImpl() throws QueryException, InterruptedException { - callback.process(getTargetsToAddToResult(keysToUseForResult)); + processResultantTargets(keysToUseForResult, callback); } } @@ -425,11 +482,12 @@ class ParallelSkyQueryUtils { } /** - * Gets the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s in the - * full visitation. + * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s + * in the full visitation to the given {@link Callback}. */ - protected abstract Iterable getTargetsToAddToResult( - Iterable keysToUseForResult) throws InterruptedException; + protected abstract void processResultantTargets( + Iterable keysToUseForResult, Callback callback) + throws QueryException, InterruptedException; /** Gets the {@link Visit} representing the local visitation of the given {@code values}. */ protected abstract Visit getVisitResult(Iterable values) throws InterruptedException; diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index c5efe91a51..a3f7c6df60 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -34,6 +34,7 @@ import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.cmdline.TargetParsingException; import com.google.devtools.build.lib.cmdline.TargetPattern; import com.google.devtools.build.lib.collect.CompactHashSet; +import com.google.devtools.build.lib.concurrent.MultisetSemaphore; import com.google.devtools.build.lib.concurrent.NamedForkJoinPool; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.events.Event; @@ -132,6 +133,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment private final int queryEvaluationParallelismLevel; // The following fields are set in the #beforeEvaluateQuery method. + private MultisetSemaphore packageSemaphore; protected WalkableGraph graph; private InterruptibleSupplier> blacklistPatternsSupplier; private ForkJoinPool forkJoinPool; @@ -205,6 +207,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment } checkEvaluationResult(result); + packageSemaphore = makeFreshPackageMultisetSemaphore(); graph = result.getWalkableGraph(); blacklistPatternsSupplier = InterruptibleSupplier.Memoize.of(new BlacklistSupplier(graph)); @@ -220,7 +223,17 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment graphBackedRecursivePackageProvider, eventHandler, TargetPatternEvaluator.DEFAULT_FILTERING_POLICY, - forkJoinPool); + forkJoinPool, + packageSemaphore); + } + + protected MultisetSemaphore makeFreshPackageMultisetSemaphore() { + return MultisetSemaphore.unbounded(); + } + + @ThreadSafe + public MultisetSemaphore getPackageMultisetSemaphore() { + return packageSemaphore; } /** @@ -342,10 +355,18 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment private Map> targetifyValues( Map> input) throws InterruptedException { + return targetifyValues( + input, + makePackageKeyToTargetKeyMap(ImmutableSet.copyOf(Iterables.concat(input.values())))); + } + + private Map> targetifyValues( + Map> input, + Multimap packageKeyToTargetKeyMap) throws InterruptedException { ImmutableMap.Builder> result = ImmutableMap.builder(); Map allTargets = - makeTargetsFromSkyKeys(Sets.newHashSet(Iterables.concat(input.values()))); + makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap); for (Map.Entry> entry : input.entrySet()) { Iterable skyKeys = entry.getValue(); @@ -446,8 +467,9 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment /** Targetify SkyKeys of reverse deps and filter out targets whose deps are not allowed. */ Collection filterRawReverseDepsOfTransitiveTraversalKeys( - Map> rawReverseDeps) throws InterruptedException { - return processRawReverseDeps(targetifyValues(rawReverseDeps)); + Map> rawReverseDeps, + Multimap packageKeyToTargetKeyMap) throws InterruptedException { + return processRawReverseDeps(targetifyValues(rawReverseDeps, packageKeyToTargetKeyMap)); } private Collection processRawReverseDeps(Map> rawReverseDeps) @@ -752,9 +774,16 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment } }; + static final Function PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER = + new Function() { + @Override + public PackageIdentifier apply(SkyKey skyKey) { + return (PackageIdentifier) skyKey.argument(); + } + }; + @ThreadSafe - public Map makeTargetsFromSkyKeys(Iterable keys) - throws InterruptedException { + Multimap makePackageKeyToTargetKeyMap(Iterable keys) { Multimap packageKeyToTargetKeyMap = ArrayListMultimap.create(); for (SkyKey key : keys) { Label label = SKYKEY_TO_LABEL.apply(key); @@ -763,6 +792,18 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment } packageKeyToTargetKeyMap.put(PackageValue.key(label.getPackageIdentifier()), key); } + return packageKeyToTargetKeyMap; + } + + @ThreadSafe + public Map makeTargetsFromSkyKeys(Iterable keys) + throws InterruptedException { + return makeTargetsFromPackageKeyToTargetKeyMap(makePackageKeyToTargetKeyMap(keys)); + } + + @ThreadSafe + public Map makeTargetsFromPackageKeyToTargetKeyMap( + Multimap packageKeyToTargetKeyMap) throws InterruptedException { ImmutableMap.Builder result = ImmutableMap.builder(); Map packageMap = graph.getSuccessfulValues(packageKeyToTargetKeyMap.keySet()); for (Map.Entry entry : packageMap.entrySet()) { @@ -924,7 +965,8 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment ThreadSafeCallback callback, ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - ParallelSkyQueryUtils.getRBuildFilesParallel(this, fileIdentifiers, callback, forkJoinPool); + ParallelSkyQueryUtils.getRBuildFilesParallel( + this, fileIdentifiers, callback, forkJoinPool, packageSemaphore); } /** @@ -1107,7 +1149,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { ParallelSkyQueryUtils.getAllRdepsUnboundedParallel( - this, expression, context, callback, forkJoinPool); + this, expression, context, callback, forkJoinPool, packageSemaphore); } @ThreadSafe diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java index 62926c8830..007109860c 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java @@ -29,6 +29,7 @@ import com.google.devtools.build.lib.cmdline.ResolvedTargets; import com.google.devtools.build.lib.cmdline.TargetParsingException; import com.google.devtools.build.lib.cmdline.TargetPatternResolver; import com.google.devtools.build.lib.concurrent.MoreFutures; +import com.google.devtools.build.lib.concurrent.MultisetSemaphore; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.EventHandler; @@ -65,16 +66,19 @@ public class RecursivePackageProviderBackedTargetPatternResolver private final EventHandler eventHandler; private final FilteringPolicy policy; private final ExecutorService executor; + private final MultisetSemaphore packageSemaphore; public RecursivePackageProviderBackedTargetPatternResolver( RecursivePackageProvider recursivePackageProvider, EventHandler eventHandler, FilteringPolicy policy, - ExecutorService executor) { + ExecutorService executor, + MultisetSemaphore packageSemaphore) { this.recursivePackageProvider = recursivePackageProvider; this.eventHandler = eventHandler; this.policy = policy; this.executor = executor; + this.packageSemaphore = packageSemaphore; } @Override @@ -215,22 +219,28 @@ public class RecursivePackageProviderBackedTargetPatternResolver tasks.add(executor.submit(new Callable() { @Override public Void call() throws E, TargetParsingException, InterruptedException { - Iterable> resolvedTargets = - bulkGetTargetsInPackage(originalPattern, pkgIdBatch, NO_FILTER).values(); - List filteredTargets = new ArrayList<>(calculateSize(resolvedTargets)); - for (ResolvedTargets targets : resolvedTargets) { - for (Target target : targets.getTargets()) { - // Perform the no-targets-found check before applying the filtering policy - // so we only return the error if the input directory's subtree really - // contains no targets. - foundTarget.set(true); - if (actualPolicy.shouldRetain(target, false)) { - filteredTargets.add(target); + ImmutableSet pkgIdBatchSet = ImmutableSet.copyOf(pkgIdBatch); + packageSemaphore.acquireAll(pkgIdBatchSet); + try { + Iterable> resolvedTargets = + bulkGetTargetsInPackage(originalPattern, pkgIdBatch, NO_FILTER).values(); + List filteredTargets = new ArrayList<>(calculateSize(resolvedTargets)); + for (ResolvedTargets targets : resolvedTargets) { + for (Target target : targets.getTargets()) { + // Perform the no-targets-found check before applying the filtering policy + // so we only return the error if the input directory's subtree really + // contains no targets. + foundTarget.set(true); + if (actualPolicy.shouldRetain(target, false)) { + filteredTargets.add(target); + } } } - } - synchronized (callbackLock) { - callback.process(filteredTargets); + synchronized (callbackLock) { + callback.process(filteredTargets); + } + } finally { + packageSemaphore.releaseAll(pkgIdBatchSet); } return null; } diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/TargetPatternFunction.java b/src/main/java/com/google/devtools/build/lib/skyframe/TargetPatternFunction.java index 32f64e82ca..ac7580651a 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/TargetPatternFunction.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/TargetPatternFunction.java @@ -17,10 +17,12 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.cmdline.Label; +import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.cmdline.ResolvedTargets; import com.google.devtools.build.lib.cmdline.TargetParsingException; import com.google.devtools.build.lib.cmdline.TargetPattern; import com.google.devtools.build.lib.collect.CompactHashSet; +import com.google.devtools.build.lib.concurrent.MultisetSemaphore; import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.skyframe.EnvironmentBackedRecursivePackageProvider.MissingDepException; import com.google.devtools.build.lib.util.BatchCallback; @@ -30,9 +32,7 @@ import com.google.devtools.build.skyframe.SkyFunction; import com.google.devtools.build.skyframe.SkyFunctionException; import com.google.devtools.build.skyframe.SkyKey; import com.google.devtools.build.skyframe.SkyValue; - import java.util.Set; - import javax.annotation.Nullable; /** @@ -54,8 +54,12 @@ public class TargetPatternFunction implements SkyFunction { EnvironmentBackedRecursivePackageProvider provider = new EnvironmentBackedRecursivePackageProvider(env); RecursivePackageProviderBackedTargetPatternResolver resolver = - new RecursivePackageProviderBackedTargetPatternResolver(provider, env.getListener(), - patternKey.getPolicy(), MoreExecutors.newDirectExecutorService()); + new RecursivePackageProviderBackedTargetPatternResolver( + provider, + env.getListener(), + patternKey.getPolicy(), + MoreExecutors.newDirectExecutorService(), + MultisetSemaphore.unbounded()); TargetPattern parsedPattern = patternKey.getParsedPattern(); ImmutableSet excludedSubdirectories = patternKey.getExcludedSubdirectories(); final Set results = CompactHashSet.create(); -- cgit v1.2.3