From 1bd4aafd1b15851da7929d279ff4dce78a9bd928 Mon Sep 17 00:00:00 2001 From: nharmata Date: Tue, 31 Oct 2017 11:23:04 -0400 Subject: Cosmetic refactor of some of the helper methods used by ParallelSkyQueryUtils.RBuildFilesVisitor. Also a minor tweak of the batch size for feeding results to the callback. Also correctly use the packageSemaphore in SkyQueryEnvironment's non-parallel implementation of rbuildfiles. RELNOTES: None PiperOrigin-RevId: 174039067 --- .../build/lib/query2/ParallelSkyQueryUtils.java | 34 ++++++++------------ .../devtools/build/lib/query2/ParallelVisitor.java | 10 ++++-- .../build/lib/query2/SkyQueryEnvironment.java | 36 +++++++++++++++++----- 3 files changed, 49 insertions(+), 31 deletions(-) (limited to 'src/main') diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java index 375e7e2eca..1948cb0c53 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java @@ -23,7 +23,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.ListMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; -import com.google.common.collect.Streams; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.collect.compacthashset.CompactHashSet; @@ -84,28 +83,28 @@ class ParallelSkyQueryUtils { static void getRBuildFilesParallel( SkyQueryEnvironment env, Collection fileIdentifiers, - Callback callback, - MultisetSemaphore packageSemaphore) - throws QueryException, InterruptedException { + Callback callback) throws QueryException, InterruptedException { Uniquifier keyUniquifier = env.createSkyKeyUniquifier(); RBuildFilesVisitor visitor = - new RBuildFilesVisitor(env, keyUniquifier, callback, packageSemaphore); + new RBuildFilesVisitor(env, keyUniquifier, callback); visitor.visitAndWaitForCompletion(env.getSkyKeysForFileFragments(fileIdentifiers)); } /** A helper class that computes 'rbuildfiles()' via BFS. */ private static class RBuildFilesVisitor extends ParallelVisitor { + // Each target in the full output of 'rbuildfiles' corresponds to BUILD file InputFile of a + // unique package. So the processResultsBatchSize we choose to pass to the ParallelVisitor ctor + // influences how many packages each leaf task doing processPartialResults will have to + // deal with at once. A value of 100 was chosen experimentally. + private static final int PROCESS_RESULTS_BATCH_SIZE = 100; private final SkyQueryEnvironment env; - private final MultisetSemaphore packageSemaphore; private RBuildFilesVisitor( SkyQueryEnvironment env, Uniquifier uniquifier, - Callback callback, - MultisetSemaphore packageSemaphore) { - super(uniquifier, callback, VISIT_BATCH_SIZE); + Callback callback) { + super(uniquifier, callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE); this.env = env; - this.packageSemaphore = packageSemaphore; } @Override @@ -135,17 +134,7 @@ class ParallelSkyQueryUtils { protected void processPartialResults( Iterable keysToUseForResult, Callback callback) throws QueryException, InterruptedException { - Set pkgIdsNeededForResult = - Streams.stream(keysToUseForResult) - .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER) - .collect(toImmutableSet()); - packageSemaphore.acquireAll(pkgIdsNeededForResult); - try { - callback.process(SkyQueryEnvironment.getBuildFilesForPackageValues( - env.graph.getSuccessfulValues(keysToUseForResult).values())); - } finally { - packageSemaphore.releaseAll(pkgIdsNeededForResult); - } + env.getBuildFileTargetsForPackageKeysAndProcessViaCallback(keysToUseForResult, callback); } @Override @@ -168,6 +157,7 @@ class ParallelSkyQueryUtils { */ private static class AllRdepsUnboundedVisitor extends ParallelVisitor, Target> { + private static final int PROCESS_RESULTS_BATCH_SIZE = SkyQueryEnvironment.BATCH_CALLBACK_SIZE; private final SkyQueryEnvironment env; private final MultisetSemaphore packageSemaphore; @@ -176,7 +166,7 @@ class ParallelSkyQueryUtils { Uniquifier> uniquifier, Callback callback, MultisetSemaphore packageSemaphore) { - super(uniquifier, callback, VISIT_BATCH_SIZE); + super(uniquifier, callback, VISIT_BATCH_SIZE, PROCESS_RESULTS_BATCH_SIZE); this.env = env; this.packageSemaphore = packageSemaphore; } diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java index e81aadd162..e1e01d5fef 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java @@ -49,6 +49,7 @@ public abstract class ParallelVisitor { private final Uniquifier uniquifier; private final Callback callback; private final int visitBatchSize; + private final int processResultsBatchSize; private final VisitingTaskExecutor executor; @@ -117,10 +118,15 @@ public abstract class ParallelVisitor { /*workQueue=*/ new BlockingStack(), new ThreadFactoryBuilder().setNameFormat("parallel-visitor %d").build()); - protected ParallelVisitor(Uniquifier uniquifier, Callback callback, int visitBatchSize) { + protected ParallelVisitor( + Uniquifier uniquifier, + Callback callback, + int visitBatchSize, + int processResultsBatchSize) { this.uniquifier = uniquifier; this.callback = callback; this.visitBatchSize = visitBatchSize; + this.processResultsBatchSize = processResultsBatchSize; this.executor = new VisitingTaskExecutor(FIXED_THREAD_POOL_EXECUTOR, PARALLEL_VISITOR_ERROR_CLASSIFIER); } @@ -212,7 +218,7 @@ public abstract class ParallelVisitor { Visit visit = getVisitResult(uniqueKeys); for (Iterable keysToUseForResultBatch : - Iterables.partition(visit.keysToUseForResult, SkyQueryEnvironment.BATCH_CALLBACK_SIZE)) { + Iterables.partition(visit.keysToUseForResult, processResultsBatchSize)) { executor.execute(new GetAndProcessResultsTask(keysToUseForResultBatch)); } diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index 6a9d2df3a5..88072aa996 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -13,6 +13,7 @@ // limitations under the License. package com.google.devtools.build.lib.query2; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import com.google.common.annotations.VisibleForTesting; @@ -31,6 +32,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import com.google.common.collect.Streams; import com.google.common.util.concurrent.AsyncCallable; import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; @@ -137,7 +139,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment implements StreamableQueryEnvironment { // 10k is likely a good balance between using batch efficiently and not blowing up memory. // TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant. - static final int BATCH_CALLBACK_SIZE = 10000; + protected static final int BATCH_CALLBACK_SIZE = 10000; protected static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors(); private static final int MAX_QUERY_EXPRESSION_LOG_CHARS = 1000; private static final Logger logger = Logger.getLogger(SkyQueryEnvironment.class.getName()); @@ -1045,7 +1047,26 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment } return result; } - static Iterable getBuildFilesForPackageValues(Iterable packageValues) { + + void getBuildFileTargetsForPackageKeysAndProcessViaCallback( + Iterable packageKeys, Callback callback) + throws QueryException, InterruptedException { + Set pkgIds = + Streams.stream(packageKeys) + .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER) + .collect(toImmutableSet()); + packageSemaphore.acquireAll(pkgIds); + try { + Iterable packageValues = graph.getSuccessfulValues(packageKeys).values(); + Iterable buildFileTargets = getBuildFileTargetsFromPackageValues(packageValues); + callback.process(buildFileTargets); + } finally { + packageSemaphore.releaseAll(pkgIds); + } + } + + protected Iterable getBuildFileTargetsFromPackageValues( + Iterable packageValues) { // TODO(laurentlb): Use streams? return Iterables.transform( Iterables.filter( @@ -1062,7 +1083,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment safeSubmit( () -> { ParallelSkyQueryUtils.getRBuildFilesParallel( - SkyQueryEnvironment.this, fileIdentifiers, callback, packageSemaphore); + SkyQueryEnvironment.this, fileIdentifiers, callback); return null; })); } @@ -1103,14 +1124,12 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment } if (resultKeys.size() >= BATCH_CALLBACK_SIZE) { for (Iterable batch : Iterables.partition(resultKeys, BATCH_CALLBACK_SIZE)) { - callback.process( - getBuildFilesForPackageValues(graph.getSuccessfulValues(batch).values())); + getBuildFileTargetsForPackageKeysAndProcessViaCallback(batch, callback); } resultKeys.clear(); } } - callback.process( - getBuildFilesForPackageValues(graph.getSuccessfulValues(resultKeys).values())); + getBuildFileTargetsForPackageKeysAndProcessViaCallback(resultKeys, callback); return immediateSuccessfulFuture(null); } catch (QueryException e) { return immediateFailedFuture(e); @@ -1189,6 +1208,9 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment // TODO(nharmata): For queries with less than {@code batchThreshold} results, this batching // strategy probably hurts performance since we can only start formatting results once the entire // query is finished. + // TODO(nharmata): This batching strategy is also potentially harmful from a memory perspective + // since when the Targets being output are backed by Package instances, we're delaying GC of the + // Package instances until the output batch size is met. private static class BatchStreamedCallback extends ThreadSafeOutputFormatterCallback implements Callback { -- cgit v1.2.3