diff options
author | 2017-10-31 11:23:04 -0400 | |
---|---|---|
committer | 2017-11-01 09:58:19 -0400 | |
commit | 1bd4aafd1b15851da7929d279ff4dce78a9bd928 (patch) | |
tree | 98f2112d6b7fb0b15106764afbeef459d1ae5b94 /src/main/java/com/google | |
parent | 33fc33ff64b754674c6a6701824fb68ae3bc24f2 (diff) |
Cosmetic refactor of some of the helper methods used by ParallelSkyQueryUtils.RBuildFilesVisitor. Also a minor tweak of the batch size for feeding results to the callback.
Also correctly use the packageSemaphore in SkyQueryEnvironment's non-parallel implementation of rbuildfiles.
RELNOTES: None
PiperOrigin-RevId: 174039067
Diffstat (limited to 'src/main/java/com/google')
3 files changed, 49 insertions, 31 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java index 375e7e2eca..1948cb0c53 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java @@ -23,7 +23,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.ListMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; -import com.google.common.collect.Streams; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.collect.compacthashset.CompactHashSet; @@ -84,28 +83,28 @@ class ParallelSkyQueryUtils { static void getRBuildFilesParallel( SkyQueryEnvironment env, Collection<PathFragment> fileIdentifiers, - Callback<Target> callback, - MultisetSemaphore<PackageIdentifier> packageSemaphore) - throws QueryException, InterruptedException { + Callback<Target> callback) throws QueryException, InterruptedException { Uniquifier<SkyKey> keyUniquifier = env.createSkyKeyUniquifier(); RBuildFilesVisitor visitor = - new RBuildFilesVisitor(env, keyUniquifier, callback, packageSemaphore); + new RBuildFilesVisitor(env, keyUniquifier, callback); visitor.visitAndWaitForCompletion(env.getSkyKeysForFileFragments(fileIdentifiers)); } /** A helper class that computes 'rbuildfiles(<blah>)' via BFS. */ private static class RBuildFilesVisitor extends ParallelVisitor<SkyKey, Target> { + // Each target in the full output of 'rbuildfiles' corresponds to BUILD file InputFile of a + // unique package. So the processResultsBatchSize we choose to pass to the ParallelVisitor ctor + // influences how many packages each leaf task doing processPartialResults will have to + // deal with at once. A value of 100 was chosen experimentally. + private static final int PROCESS_RESULTS_BATCH_SIZE = 100; private final SkyQueryEnvironment env; - private final MultisetSemaphore<PackageIdentifier> packageSemaphore; private RBuildFilesVisitor( SkyQueryEnvironment env, Uniquifier<SkyKey> uniquifier, - Callback<Target> callback, - MultisetSemaphore<PackageIdentifier> packageSemaphore) { - super(uniquifier, callback, VISIT_BATCH_SIZE); + Callback<Target> callback) { + super(uniquifier, callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE); this.env = env; - this.packageSemaphore = packageSemaphore; } @Override @@ -135,17 +134,7 @@ class ParallelSkyQueryUtils { protected void processPartialResults( Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) throws QueryException, InterruptedException { - Set<PackageIdentifier> pkgIdsNeededForResult = - Streams.stream(keysToUseForResult) - .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER) - .collect(toImmutableSet()); - packageSemaphore.acquireAll(pkgIdsNeededForResult); - try { - callback.process(SkyQueryEnvironment.getBuildFilesForPackageValues( - env.graph.getSuccessfulValues(keysToUseForResult).values())); - } finally { - packageSemaphore.releaseAll(pkgIdsNeededForResult); - } + env.getBuildFileTargetsForPackageKeysAndProcessViaCallback(keysToUseForResult, callback); } @Override @@ -168,6 +157,7 @@ class ParallelSkyQueryUtils { */ private static class AllRdepsUnboundedVisitor extends ParallelVisitor<Pair<SkyKey, SkyKey>, Target> { + private static final int PROCESS_RESULTS_BATCH_SIZE = SkyQueryEnvironment.BATCH_CALLBACK_SIZE; private final SkyQueryEnvironment env; private final MultisetSemaphore<PackageIdentifier> packageSemaphore; @@ -176,7 +166,7 @@ class ParallelSkyQueryUtils { Uniquifier<Pair<SkyKey, SkyKey>> uniquifier, Callback<Target> callback, MultisetSemaphore<PackageIdentifier> packageSemaphore) { - super(uniquifier, callback, VISIT_BATCH_SIZE); + super(uniquifier, callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE); this.env = env; this.packageSemaphore = packageSemaphore; } diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java index e81aadd162..e1e01d5fef 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java @@ -49,6 +49,7 @@ public abstract class ParallelVisitor<T, V> { private final Uniquifier<T> uniquifier; private final Callback<V> callback; private final int visitBatchSize; + private final int processResultsBatchSize; private final VisitingTaskExecutor executor; @@ -117,10 +118,15 @@ public abstract class ParallelVisitor<T, V> { /*workQueue=*/ new BlockingStack<Runnable>(), new ThreadFactoryBuilder().setNameFormat("parallel-visitor %d").build()); - protected ParallelVisitor(Uniquifier<T> uniquifier, Callback<V> callback, int visitBatchSize) { + protected ParallelVisitor( + Uniquifier<T> uniquifier, + Callback<V> callback, + int visitBatchSize, + int processResultsBatchSize) { this.uniquifier = uniquifier; this.callback = callback; this.visitBatchSize = visitBatchSize; + this.processResultsBatchSize = processResultsBatchSize; this.executor = new VisitingTaskExecutor(FIXED_THREAD_POOL_EXECUTOR, PARALLEL_VISITOR_ERROR_CLASSIFIER); } @@ -212,7 +218,7 @@ public abstract class ParallelVisitor<T, V> { Visit visit = getVisitResult(uniqueKeys); for (Iterable<SkyKey> keysToUseForResultBatch : - Iterables.partition(visit.keysToUseForResult, SkyQueryEnvironment.BATCH_CALLBACK_SIZE)) { + Iterables.partition(visit.keysToUseForResult, processResultsBatchSize)) { executor.execute(new GetAndProcessResultsTask(keysToUseForResultBatch)); } diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index 6a9d2df3a5..88072aa996 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -13,6 +13,7 @@ // limitations under the License. package com.google.devtools.build.lib.query2; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import com.google.common.annotations.VisibleForTesting; @@ -31,6 +32,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import com.google.common.collect.Streams; import com.google.common.util.concurrent.AsyncCallable; import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; @@ -137,7 +139,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> implements StreamableQueryEnvironment<Target> { // 10k is likely a good balance between using batch efficiently and not blowing up memory. // TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant. - static final int BATCH_CALLBACK_SIZE = 10000; + protected static final int BATCH_CALLBACK_SIZE = 10000; protected static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors(); private static final int MAX_QUERY_EXPRESSION_LOG_CHARS = 1000; private static final Logger logger = Logger.getLogger(SkyQueryEnvironment.class.getName()); @@ -1045,7 +1047,26 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> } return result; } - static Iterable<Target> getBuildFilesForPackageValues(Iterable<SkyValue> packageValues) { + + void getBuildFileTargetsForPackageKeysAndProcessViaCallback( + Iterable<SkyKey> packageKeys, Callback<Target> callback) + throws QueryException, InterruptedException { + Set<PackageIdentifier> pkgIds = + Streams.stream(packageKeys) + .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER) + .collect(toImmutableSet()); + packageSemaphore.acquireAll(pkgIds); + try { + Iterable<SkyValue> packageValues = graph.getSuccessfulValues(packageKeys).values(); + Iterable<Target> buildFileTargets = getBuildFileTargetsFromPackageValues(packageValues); + callback.process(buildFileTargets); + } finally { + packageSemaphore.releaseAll(pkgIds); + } + } + + protected Iterable<Target> getBuildFileTargetsFromPackageValues( + Iterable<SkyValue> packageValues) { // TODO(laurentlb): Use streams? return Iterables.transform( Iterables.filter( @@ -1062,7 +1083,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> safeSubmit( () -> { ParallelSkyQueryUtils.getRBuildFilesParallel( - SkyQueryEnvironment.this, fileIdentifiers, callback, packageSemaphore); + SkyQueryEnvironment.this, fileIdentifiers, callback); return null; })); } @@ -1103,14 +1124,12 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> } if (resultKeys.size() >= BATCH_CALLBACK_SIZE) { for (Iterable<SkyKey> batch : Iterables.partition(resultKeys, BATCH_CALLBACK_SIZE)) { - callback.process( - getBuildFilesForPackageValues(graph.getSuccessfulValues(batch).values())); + getBuildFileTargetsForPackageKeysAndProcessViaCallback(batch, callback); } resultKeys.clear(); } } - callback.process( - getBuildFilesForPackageValues(graph.getSuccessfulValues(resultKeys).values())); + getBuildFileTargetsForPackageKeysAndProcessViaCallback(resultKeys, callback); return immediateSuccessfulFuture(null); } catch (QueryException e) { return immediateFailedFuture(e); @@ -1189,6 +1208,9 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> // TODO(nharmata): For queries with less than {@code batchThreshold} results, this batching // strategy probably hurts performance since we can only start formatting results once the entire // query is finished. + // TODO(nharmata): This batching strategy is also potentially harmful from a memory perspective + // since when the Targets being output are backed by Package instances, we're delaying GC of the + // Package instances until the output batch size is met. private static class BatchStreamedCallback extends ThreadSafeOutputFormatterCallback<Target> implements Callback<Target> { |