// Copyright 2016 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.devtools.build.lib.query2; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.ListMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.collect.CompactHashSet; import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; import com.google.devtools.build.lib.concurrent.BlockingStack; import com.google.devtools.build.lib.concurrent.ErrorClassifier; import com.google.devtools.build.lib.concurrent.MultisetSemaphore; import com.google.devtools.build.lib.concurrent.QuiescingExecutor; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.query2.engine.Callback; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import com.google.devtools.build.lib.query2.engine.QueryException; import com.google.devtools.build.lib.query2.engine.QueryExpression; import com.google.devtools.build.lib.query2.engine.Uniquifier; import com.google.devtools.build.lib.query2.engine.VariableContext; import com.google.devtools.build.lib.skyframe.PackageValue; import com.google.devtools.build.lib.skyframe.SkyFunctions; import com.google.devtools.build.lib.util.Pair; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.skyframe.SkyKey; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Parallel implementations of various functionality in {@link SkyQueryEnvironment}. * *

Special attention is given to memory usage. Naive parallel implementations of query * functionality would lead to memory blowup. Instead of dealing with {@link Target}s, we try to * deal with {@link SkyKey}s as much as possible to reduce the number of {@link Package}s forcibly * in memory at any given time. */ // TODO(bazel-team): Be more deliberate about bounding memory usage here. class ParallelSkyQueryUtils { /** The maximum number of keys to visit at once. */ @VisibleForTesting static final int VISIT_BATCH_SIZE = 10000; private ParallelSkyQueryUtils() { } /** * Specialized parallel variant of {@link SkyQueryEnvironment#getAllRdeps} that is appropriate * when there is no depth-bound. */ static QueryTaskFuture getAllRdepsUnboundedParallel( SkyQueryEnvironment env, QueryExpression expression, VariableContext context, Callback callback, MultisetSemaphore packageSemaphore) { return env.eval( expression, context, new SkyKeyBFSVisitorCallback( new AllRdepsUnboundedVisitor.Factory(env, callback, packageSemaphore))); } /** Specialized parallel variant of {@link SkyQueryEnvironment#getRBuildFiles}. */ static void getRBuildFilesParallel( SkyQueryEnvironment env, Collection fileIdentifiers, Callback callback, MultisetSemaphore packageSemaphore) throws QueryException, InterruptedException { Uniquifier keyUniquifier = env.createSkyKeyUniquifier(); RBuildFilesVisitor visitor = new RBuildFilesVisitor(env, keyUniquifier, callback, packageSemaphore); visitor.visitAndWaitForCompletion(env.getSkyKeysForFileFragments(fileIdentifiers)); } /** A helper class that computes 'rbuildfiles()' via BFS. */ private static class RBuildFilesVisitor extends AbstractSkyKeyBFSVisitor { private final MultisetSemaphore packageSemaphore; private RBuildFilesVisitor( SkyQueryEnvironment env, Uniquifier uniquifier, Callback callback, MultisetSemaphore packageSemaphore) { super(env, uniquifier, callback); this.packageSemaphore = packageSemaphore; } @Override protected Visit getVisitResult(Iterable values) throws InterruptedException { Collection> reverseDeps = env.graph.getReverseDeps(values).values(); Set keysToUseForResult = CompactHashSet.create(); Set keysToVisitNext = CompactHashSet.create(); for (SkyKey rdep : Iterables.concat(reverseDeps)) { if (rdep.functionName().equals(SkyFunctions.PACKAGE)) { keysToUseForResult.add(rdep); // Every package has a dep on the external package, so we need to include those edges too. if (rdep.equals(PackageValue.key(Label.EXTERNAL_PACKAGE_IDENTIFIER))) { keysToVisitNext.add(rdep); } } else if (!rdep.functionName().equals(SkyFunctions.PACKAGE_LOOKUP)) { // Packages may depend on the existence of subpackages, but these edges aren't relevant to // rbuildfiles. keysToVisitNext.add(rdep); } } return new Visit(keysToUseForResult, keysToVisitNext); } @Override protected void processResultantTargets( Iterable keysToUseForResult, Callback callback) throws QueryException, InterruptedException { Set pkgIdsNeededForResult = ImmutableSet.copyOf( Iterables.transform( keysToUseForResult, SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)); packageSemaphore.acquireAll(pkgIdsNeededForResult); try { callback.process(SkyQueryEnvironment.getBuildFilesForPackageValues( env.graph.getSuccessfulValues(keysToUseForResult).values())); } finally { packageSemaphore.releaseAll(pkgIdsNeededForResult); } } @Override protected Iterable preprocessInitialVisit(Iterable keys) { return keys; } } /** * A helper class that computes 'allrdeps()' via BFS. * *

The visitor uses a pair of to keep track the nodes to visit and avoid * dealing with targetification of reverse deps until they are needed. The node itself is needed * to filter out disallowed deps later. Compared against the approach using a single SkyKey, it * consumes 16 more bytes in a 64-bit environment for each edge. However it defers the need to * load all the packages which have at least a target as a rdep of the current batch, thus greatly * reduces the risk of OOMs. The additional memory usage should not be a large concern here, as * even with 10M edges, the memory overhead is around 160M, and the memory can be reclaimed by * regular GC. */ private static class AllRdepsUnboundedVisitor extends AbstractSkyKeyBFSVisitor> { private final MultisetSemaphore packageSemaphore; private AllRdepsUnboundedVisitor( SkyQueryEnvironment env, Uniquifier> uniquifier, Callback callback, MultisetSemaphore packageSemaphore) { super(env, uniquifier, callback); this.packageSemaphore = packageSemaphore; } /** * A {@link Factory} for {@link AllRdepsUnboundedVisitor} instances, each of which will be used * to perform visitation of the reverse transitive closure of the {@link Target}s passed in a * single {@link Callback#process} call. Note that all the created instances share the same * {@link Uniquifier} so that we don't visit the same Skyframe node more than once. */ private static class Factory implements AbstractSkyKeyBFSVisitor.Factory { private final SkyQueryEnvironment env; private final Uniquifier> uniquifier; private final Callback callback; private final MultisetSemaphore packageSemaphore; private Factory( SkyQueryEnvironment env, Callback callback, MultisetSemaphore packageSemaphore) { this.env = env; this.uniquifier = env.createReverseDepSkyKeyUniquifier(); this.callback = callback; this.packageSemaphore = packageSemaphore; } @Override public AbstractSkyKeyBFSVisitor> create() { return new AllRdepsUnboundedVisitor(env, uniquifier, callback, packageSemaphore); } } @Override protected Visit getVisitResult(Iterable> keys) throws InterruptedException { Collection filteredKeys = new ArrayList<>(); // Build a raw reverse dep map from pairs of SkyKeys to filter out the disallowed deps. Map> reverseDepsMap = Maps.newHashMap(); for (Pair reverseDepPair : keys) { // First-level nodes do not have a parent node (they may have one in Skyframe but we do not // need to retrieve them. if (reverseDepPair.first == null) { filteredKeys.add(Preconditions.checkNotNull(reverseDepPair.second)); continue; } if (!reverseDepsMap.containsKey(reverseDepPair.first)) { reverseDepsMap.put(reverseDepPair.first, new LinkedList()); } reverseDepsMap.get(reverseDepPair.first).add(reverseDepPair.second); } Multimap packageKeyToTargetKeyMap = env.makePackageKeyToTargetKeyMap(Iterables.concat(reverseDepsMap.values())); Set pkgIdsNeededForTargetification = ImmutableSet.copyOf( Iterables.transform( packageKeyToTargetKeyMap.keySet(), SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)); packageSemaphore.acquireAll(pkgIdsNeededForTargetification); try { // Filter out disallowed deps. We cannot defer the targetification any further as we do not // want to retrieve the rdeps of unwanted nodes (targets). if (!reverseDepsMap.isEmpty()) { Collection filteredTargets = env.filterRawReverseDepsOfTransitiveTraversalKeys( reverseDepsMap, packageKeyToTargetKeyMap); filteredKeys.addAll( Collections2.transform(filteredTargets, SkyQueryEnvironment.TARGET_TO_SKY_KEY)); } } finally { packageSemaphore.releaseAll(pkgIdsNeededForTargetification); } // Retrieve the reverse deps as SkyKeys and defer the targetification and filtering to next // recursive visitation. Map> unfilteredReverseDeps = env.graph.getReverseDeps(filteredKeys); ImmutableList.Builder> builder = ImmutableList.builder(); for (Map.Entry> rdeps : unfilteredReverseDeps.entrySet()) { for (SkyKey rdep : rdeps.getValue()) { Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(rdep); if (label != null) { builder.add(Pair.of(rdeps.getKey(), rdep)); } } } return new Visit(/*keysToUseForResult=*/ filteredKeys, /*keysToVisit=*/ builder.build()); } @Override protected void processResultantTargets( Iterable keysToUseForResult, Callback callback) throws QueryException, InterruptedException { Multimap packageKeyToTargetKeyMap = env.makePackageKeyToTargetKeyMap(keysToUseForResult); Set pkgIdsNeededForResult = ImmutableSet.copyOf( Iterables.transform( packageKeyToTargetKeyMap.keySet(), SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER)); packageSemaphore.acquireAll(pkgIdsNeededForResult); try { callback.process( env.makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap).values()); } finally { packageSemaphore.releaseAll(pkgIdsNeededForResult); } } @Override protected Iterable> preprocessInitialVisit(Iterable keys) { return Iterables.transform( keys, new Function>() { @Override public Pair apply(SkyKey key) { // Set parent of first-level nodes to null. They are handled specially in // AllRdepsUnboundedVisitor#getVisitResult and will not be filtered later. return Pair.of(null, key); } }); } @Override protected Iterable getVisitTasks(Collection> pendingKeysToVisit) { // Group pending visits by package. ListMultimap> visitsByPackage = ArrayListMultimap.create(); for (Pair visit : pendingKeysToVisit) { Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(visit.second); if (label != null) { visitsByPackage.put(label.getPackageIdentifier(), visit); } } ImmutableList.Builder builder = ImmutableList.builder(); // A couple notes here: // (i) ArrayListMultimap#values returns the values grouped by key, which is exactly what we // want. // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid // accidentally retaining the entire ArrayListMultimap object. for (Iterable> keysToVisitBatch : Iterables.partition(ImmutableList.copyOf(visitsByPackage.values()), VISIT_BATCH_SIZE)) { builder.add(new VisitTask(keysToVisitBatch)); } return builder.build(); } } /** * A {@link Callback} whose {@link Callback#process} method kicks off a BFS visitation via a fresh * {@link AbstractSkyKeyBFSVisitor} instance. */ private static class SkyKeyBFSVisitorCallback implements Callback { private final AbstractSkyKeyBFSVisitor.Factory visitorFactory; private SkyKeyBFSVisitorCallback(AbstractSkyKeyBFSVisitor.Factory visitorFactory) { this.visitorFactory = visitorFactory; } @Override public void process(Iterable partialResult) throws QueryException, InterruptedException { AbstractSkyKeyBFSVisitor visitor = visitorFactory.create(); // TODO(nharmata): It's not ideal to have an operation like this in #process that blocks on // another, potentially expensive computation. Refactor to something like "processAsync". visitor.visitAndWaitForCompletion( SkyQueryEnvironment.makeTransitiveTraversalKeysStrict(partialResult)); } } /** * A helper class for performing a custom BFS visitation on the Skyframe graph, using {@link * QuiescingExecutor}. * *

The visitor uses an AbstractQueueVisitor backed by a ThreadPoolExecutor with a thread pool * NOT part of the global query evaluation pool to avoid starvation. */ @ThreadSafe private abstract static class AbstractSkyKeyBFSVisitor { protected final SkyQueryEnvironment env; private final Uniquifier uniquifier; private final Callback callback; private final BFSVisitingTaskExecutor executor; /** A queue to store pending visits. */ private final LinkedBlockingQueue processingQueue = new LinkedBlockingQueue<>(); /** * The max time interval between two scheduling passes in milliseconds. A scheduling pass is * defined as the scheduler thread determining whether to drain all pending visits from the * queue and submitting tasks to perform the visits. * *

The choice of 1ms is a result based of experiments. It is an attempted balance due to a * few facts about the scheduling interval: * *

1. A large interval adds systematic delay. In an extreme case, a BFS visit which is * supposed to take only 1ms now may take 5ms. For most BFS visits which take longer than a few * hundred milliseconds, it should not be noticeable. * *

2. A zero-interval config eats too much CPU. * *

Even though the scheduler runs once every 1 ms, it does not try to drain it every time. * Pending visits are drained only certain criteria are met. */ private static final long SCHEDULING_INTERVAL_MILLISECONDS = 1; /** * The minimum number of pending tasks the scheduler tries to hit. The 3x number is set based on * experiments. We do not want to schedule tasks too frequently to miss the benefits of large * number of keys being grouped by packages. On the other hand, we want to keep all threads in * the pool busy to achieve full capacity. A low number here will cause some of the worker * threads to go idle at times before the next scheduling cycle. * *

TODO(shazh): Revisit the choice of task target based on real-prod performance. */ private static final long MIN_PENDING_TASKS = 3 * SkyQueryEnvironment.DEFAULT_THREAD_COUNT; /** * Fail fast on RuntimeExceptions, including {code RuntimeInterruptedException} and {@code * RuntimeQueryException}, which are resulted from InterruptedException and QueryException. */ static final ErrorClassifier SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER = new ErrorClassifier() { @Override protected ErrorClassification classifyException(Exception e) { return (e instanceof RuntimeException) ? ErrorClassification.CRITICAL_AND_LOG : ErrorClassification.NOT_CRITICAL; } }; /** All BFS visitors share a single global fixed thread pool. */ private static final ExecutorService FIXED_THREAD_POOL_EXECUTOR = new ThreadPoolExecutor( // Must be at least 2 worker threads in the pool (1 for the scheduler thread). /*corePoolSize=*/ Math.max(2, SkyQueryEnvironment.DEFAULT_THREAD_COUNT), /*maximumPoolSize=*/ Math.max(2, SkyQueryEnvironment.DEFAULT_THREAD_COUNT), /*keepAliveTime=*/ 1, /*units=*/ TimeUnit.SECONDS, /*workQueue=*/ new BlockingStack(), new ThreadFactoryBuilder().setNameFormat("skykey-bfs-visitor %d").build()); private AbstractSkyKeyBFSVisitor( SkyQueryEnvironment env, Uniquifier uniquifier, Callback callback) { this.env = env; this.uniquifier = uniquifier; this.callback = callback; this.executor = new BFSVisitingTaskExecutor( FIXED_THREAD_POOL_EXECUTOR, SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER); } /** Factory for {@link AbstractSkyKeyBFSVisitor} instances. */ private static interface Factory { AbstractSkyKeyBFSVisitor create(); } protected final class Visit { private final Iterable keysToUseForResult; private final Iterable keysToVisit; private Visit(Iterable keysToUseForResult, Iterable keysToVisit) { this.keysToUseForResult = keysToUseForResult; this.keysToVisit = keysToVisit; } } void visitAndWaitForCompletion(Iterable keys) throws QueryException, InterruptedException { processingQueue.addAll(ImmutableList.copyOf(preprocessInitialVisit(keys))); executor.bfsVisitAndWaitForCompletion(); } /** * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s in * the full visitation to the given {@link Callback}. */ protected abstract void processResultantTargets( Iterable keysToUseForResult, Callback callback) throws QueryException, InterruptedException; /** Gets the {@link Visit} representing the local visitation of the given {@code values}. */ protected abstract Visit getVisitResult(Iterable values) throws InterruptedException; /** Gets the first {@link Visit} representing the entry-level SkyKeys. */ protected abstract Iterable preprocessInitialVisit(Iterable keys); protected Iterable getVisitTasks(Collection pendingKeysToVisit) { ImmutableList.Builder builder = ImmutableList.builder(); for (Iterable keysToVisitBatch : Iterables.partition(pendingKeysToVisit, VISIT_BATCH_SIZE)) { builder.add(new VisitTask(keysToVisitBatch)); } return builder.build(); } abstract static class Task implements Runnable { @Override public void run() { try { process(); } catch (QueryException e) { throw new RuntimeQueryException(e); } catch (InterruptedException e) { throw new RuntimeInterruptedException(e); } } abstract void process() throws QueryException, InterruptedException; } class VisitTask extends Task { private final Iterable keysToVisit; private VisitTask(Iterable keysToVisit) { this.keysToVisit = keysToVisit; } @Override void process() throws InterruptedException { ImmutableList uniqueKeys = uniquifier.unique(keysToVisit); if (uniqueKeys.isEmpty()) { return; } Visit visit = getVisitResult(uniqueKeys); for (Iterable keysToUseForResultBatch : Iterables.partition( visit.keysToUseForResult, SkyQueryEnvironment.BATCH_CALLBACK_SIZE)) { executor.execute(new GetAndProcessResultsTask(keysToUseForResultBatch)); } processingQueue.addAll(ImmutableList.copyOf(visit.keysToVisit)); } } private class GetAndProcessResultsTask extends Task { private final Iterable keysToUseForResult; private GetAndProcessResultsTask(Iterable keysToUseForResult) { this.keysToUseForResult = keysToUseForResult; } @Override protected void process() throws QueryException, InterruptedException { processResultantTargets(keysToUseForResult, callback); } } /** * A custom implementation of {@link QuiescingExecutor} which uses a centralized queue and * scheduler for parallel BFS visitations. */ private class BFSVisitingTaskExecutor extends AbstractQueueVisitor { private BFSVisitingTaskExecutor(ExecutorService executor, ErrorClassifier errorClassifier) { super( /*concurrent=*/ true, /*executorService=*/ executor, // Leave the thread pool active for other current and future callers. /*shutdownOnCompletion=*/ false, /*failFastOnException=*/ true, /*errorClassifier=*/ errorClassifier); } private void bfsVisitAndWaitForCompletion() throws QueryException, InterruptedException { // The scheduler keeps running until either of the following two conditions are met. // // 1. Errors (QueryException or InterruptedException) occurred and visitations should fail // fast. // 2. There is no pending visit in the queue and no pending task running. while (!mustJobsBeStopped() && (!processingQueue.isEmpty() || getTaskCount() > 0)) { // To achieve maximum efficiency, queue is drained in either of the following two // conditions: // // 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU. // 2. The process queue size is large. if (getTaskCount() < MIN_PENDING_TASKS || processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) { Collection pendingKeysToVisit = new ArrayList<>(processingQueue.size()); processingQueue.drainTo(pendingKeysToVisit); for (Task task : getVisitTasks(pendingKeysToVisit)) { execute(task); } } try { Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS); } catch (InterruptedException e) { // If the main thread waiting for completion of the visitation is interrupted, we should // gracefully terminate all running and pending tasks before exit. If QueryException // occured in any of the worker thread, awaitTerminationAndPropagateErrorsIfAny // propagates the QueryException instead of InterruptedException. setInterrupted(); awaitTerminationAndPropagateErrorsIfAny(); throw e; } } // We reach here either because the visitation is complete, or because an error prevents us // from proceeding with the visitation. awaitTerminationAndPropagateErrorsIfAny will either // gracefully exit if the visitation is complete, or propagate the exception if error // occurred. awaitTerminationAndPropagateErrorsIfAny(); } private void awaitTerminationAndPropagateErrorsIfAny() throws QueryException, InterruptedException { try { awaitTermination(/*interruptWorkers=*/ true); } catch (RuntimeQueryException e) { throw (QueryException) e.getCause(); } catch (RuntimeInterruptedException e) { throw (InterruptedException) e.getCause(); } } } } private static class RuntimeQueryException extends RuntimeException { private RuntimeQueryException(QueryException queryException) { super(queryException); } } private static class RuntimeInterruptedException extends RuntimeException { private RuntimeInterruptedException(InterruptedException interruptedException) { super(interruptedException); } } }