diff options
author | 2018-05-25 11:12:11 -0700 | |
---|---|---|
committer | 2018-05-25 11:13:54 -0700 | |
commit | 2643d4b7543403eae52c038e769231f539938195 (patch) | |
tree | 9a6ddcec6b099af4187faa0e86c127e0001ba4d0 /src/main/java/com/google/devtools/build/lib/query2 | |
parent | af18c36dc46bfe2334fca442f6d8cc9f6943fb7c (diff) |
Implement unbounded deps() using ParallelVisitor for SkyQuery.
PiperOrigin-RevId: 198074986
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2')
6 files changed, 257 insertions, 29 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/AbstractEdgeVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/AbstractEdgeVisitor.java index dd689f0f06..6dd5818c9d 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/AbstractEdgeVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/query2/AbstractEdgeVisitor.java @@ -50,6 +50,12 @@ abstract class AbstractEdgeVisitor<T> extends ParallelVisitor<T, Target> { protected void processPartialResults( Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) throws QueryException, InterruptedException { + processResultsAndReturnTargets(keysToUseForResult, callback); + } + + protected Iterable<Target> processResultsAndReturnTargets( + Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) + throws QueryException, InterruptedException { Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap = env.makePackageKeyToTargetKeyMap(keysToUseForResult); Set<PackageIdentifier> pkgIdsNeededForResult = @@ -59,12 +65,14 @@ abstract class AbstractEdgeVisitor<T> extends ParallelVisitor<T, Target> { .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER) .collect(toImmutableSet()); packageSemaphore.acquireAll(pkgIdsNeededForResult); + Iterable<Target> targets; try { - callback.process( - env.makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap).values()); + targets = env.makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap).values(); + callback.process(targets); } finally { packageSemaphore.releaseAll(pkgIdsNeededForResult); } + return targets; } protected abstract SkyKey getNewNodeFromEdge(T visit); diff --git a/src/main/java/com/google/devtools/build/lib/query2/DepsUnboundedVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/DepsUnboundedVisitor.java new file mode 100644 index 0000000000..55adaacdfe --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/query2/DepsUnboundedVisitor.java @@ -0,0 +1,163 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.query2; + +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.devtools.build.lib.query2.SkyQueryEnvironment.IS_TTV; +import static com.google.devtools.build.lib.query2.SkyQueryEnvironment.SKYKEY_TO_LABEL; + +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; +import com.google.devtools.build.lib.cmdline.Label; +import com.google.devtools.build.lib.cmdline.PackageIdentifier; +import com.google.devtools.build.lib.concurrent.MultisetSemaphore; +import com.google.devtools.build.lib.events.Event; +import com.google.devtools.build.lib.packages.Target; +import com.google.devtools.build.lib.query2.engine.Callback; +import com.google.devtools.build.lib.query2.engine.QueryException; +import com.google.devtools.build.lib.query2.engine.Uniquifier; +import com.google.devtools.build.skyframe.SkyKey; +import java.util.Map; +import java.util.Set; + +/** + * A helper class that computes unbounded 'deps(<expr>)' via BFS. Re-use logic from {@link + * AbstractEdgeVisitor} to grab relevant semaphore locks when processing nodes as well as batching + * visits by packages. + */ +class DepsUnboundedVisitor extends AbstractEdgeVisitor<SkyKey> { + /** + * A {@link Uniquifier} for valid deps. Only used prior to visiting the deps. Deps filtering is + * done in the {@link DepsUnboundedVisitor#getVisitResult} stage. + */ + private final Uniquifier<SkyKey> validDepUniquifier; + + private final boolean depsNeedFiltering; + private final Callback<Target> errorReporter; + + DepsUnboundedVisitor( + SkyQueryEnvironment env, + Uniquifier<SkyKey> validDepUniquifier, + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore, + boolean depsNeedFiltering, + Callback<Target> errorReporter) { + super(env, callback, packageSemaphore); + this.validDepUniquifier = validDepUniquifier; + this.depsNeedFiltering = depsNeedFiltering; + this.errorReporter = errorReporter; + } + + /** + * A {@link Factory} for {@link DepsUnboundedVisitor} instances, each of which will be used to + * perform visitation of the DTC of the {@link SkyKey}s passed in a single {@link + * Callback#process} call. Note that all the created instances share the same {@link Uniquifier} + * so that we don't visit the same Skyframe node more than once. + */ + static class Factory implements ParallelVisitor.Factory { + private final SkyQueryEnvironment env; + private final Uniquifier<SkyKey> validDepUniquifier; + private final Callback<Target> callback; + private final MultisetSemaphore<PackageIdentifier> packageSemaphore; + private final boolean depsNeedFiltering; + private final Callback<Target> errorReporter; + + Factory( + SkyQueryEnvironment env, + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore, + boolean depsNeedFiltering, + Callback<Target> errorReporter) { + this.env = env; + this.validDepUniquifier = env.createSkyKeyUniquifier(); + this.callback = callback; + this.packageSemaphore = packageSemaphore; + this.depsNeedFiltering = depsNeedFiltering; + this.errorReporter = errorReporter; + } + + @Override + public ParallelVisitor<SkyKey, Target> create() { + return new DepsUnboundedVisitor( + env, validDepUniquifier, callback, packageSemaphore, depsNeedFiltering, errorReporter); + } + } + + @Override + protected Visit getVisitResult(Iterable<SkyKey> keys) throws InterruptedException { + if (depsNeedFiltering) { + // We have to targetify the keys here in order to determine the allowed dependencies. + Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap = env.makePackageKeyToTargetKeyMap(keys); + Set<PackageIdentifier> pkgIdsNeededForTargetification = + packageKeyToTargetKeyMap + .keySet() + .stream() + .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER) + .collect(toImmutableSet()); + packageSemaphore.acquireAll(pkgIdsNeededForTargetification); + Iterable<Target> deps; + try { + deps = env.getFwdDeps(env.makeTargetsFromSkyKeys(keys).values()); + } finally { + packageSemaphore.releaseAll(pkgIdsNeededForTargetification); + } + + return new Visit( + /*keysToUseForResult=*/ keys, + /*keysToVisit=*/ Iterables.transform(deps, Target::getLabel)); + } + + // We need to explicitly check that all requested TTVs are actually in the graph. + Map<SkyKey, Iterable<SkyKey>> depMap = env.graph.getDirectDeps(keys); + checkIfMissingTargets(keys, depMap); + Iterable<SkyKey> deps = Iterables.filter(Iterables.concat(depMap.values()), IS_TTV); + return new Visit(keys, deps); + } + + private void checkIfMissingTargets(Iterable<SkyKey> keys, Map<SkyKey, Iterable<SkyKey>> depMap) { + if (depMap.size() != Iterables.size(keys)) { + Iterable<Label> missingTargets = + Iterables.transform( + Iterables.filter(keys, Predicates.not(Predicates.in(depMap.keySet()))), + SKYKEY_TO_LABEL); + env.getEventHandler() + .handle(Event.warn("Targets were missing from graph: " + missingTargets)); + } + } + + @Override + protected Iterable<SkyKey> preprocessInitialVisit(Iterable<SkyKey> keys) { + return keys; + } + + @Override + protected void processPartialResults( + Iterable<SkyKey> keysToUseForResult, Callback<Target> callback) + throws QueryException, InterruptedException { + errorReporter.process(processResultsAndReturnTargets(keysToUseForResult, callback)); + } + + @Override + protected SkyKey getNewNodeFromEdge(SkyKey visit) { + return visit; + } + + @Override + protected ImmutableList<SkyKey> getUniqueValues(Iterable<SkyKey> keysToVisit) { + // Legit deps are already filtered using env.getFwdDeps(). + return validDepUniquifier.unique(keysToVisit); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java index 516d2e6f84..1fe3bbf97f 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java +++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java @@ -170,6 +170,22 @@ public class ParallelSkyQueryUtils { visitor.visitAndWaitForCompletion(env.getFileStateKeysForFileFragments(fileIdentifiers)); } + static QueryTaskFuture<Void> getDepsUnboundedParallel( + SkyQueryEnvironment env, + QueryExpression expression, + VariableContext<Target> context, + Callback<Target> callback, + MultisetSemaphore<PackageIdentifier> packageSemaphore, + boolean depsNeedFiltering, + Callback<Target> errorReporter) { + return env.eval( + expression, + context, + ParallelVisitor.createParallelVisitorCallback( + new DepsUnboundedVisitor.Factory( + env, callback, packageSemaphore, depsNeedFiltering, errorReporter))); + } + static class DepAndRdep { @Nullable final SkyKey dep; final SkyKey rdep; diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index ea07744ab9..fb2281a2aa 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -508,7 +508,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> * only when {@link SkyQueryEnvironment#dependencyFilter} is set to {@link * DependencyFilter#ALL_DEPS}. */ - Multimap<SkyKey, SkyKey> getDirectDepsOfSkyKeys(Iterable<SkyKey> keys) + public Multimap<SkyKey, SkyKey> getDirectDepsOfSkyKeys(Iterable<SkyKey> keys) throws InterruptedException { Preconditions.checkState(dependencyFilter == DependencyFilter.ALL_DEPS, dependencyFilter); ImmutableMultimap.Builder<SkyKey, SkyKey> builder = ImmutableMultimap.builder(); @@ -668,7 +668,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> } @ThreadSafe - protected Uniquifier<SkyKey> createSkyKeyUniquifier() { + public Uniquifier<SkyKey> createSkyKeyUniquifier() { return new UniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT); } @@ -906,11 +906,15 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> // so no preloading of target patterns is necessary. } - static final Predicate<SkyKey> IS_TTV = SkyFunctionName.functionIs(Label.TRANSITIVE_TRAVERSAL); + public ExtendedEventHandler getEventHandler() { + return eventHandler; + } - static final Function<SkyKey, Label> SKYKEY_TO_LABEL = - skyKey -> IS_TTV.apply(skyKey) ? (Label) skyKey.argument() : null; + public static final Predicate<SkyKey> IS_TTV = + SkyFunctionName.functionIs(Label.TRANSITIVE_TRAVERSAL); + public static final Function<SkyKey, Label> SKYKEY_TO_LABEL = + skyKey -> IS_TTV.apply(skyKey) ? (Label) skyKey.argument() : null; static final Function<SkyKey, PackageIdentifier> PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER = skyKey -> (PackageIdentifier) skyKey.argument(); @@ -1255,6 +1259,22 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> this, expression, universePredicate, context, callback, packageSemaphore)); } + @Override + public QueryTaskFuture<Void> getDepsUnboundedParallel( + QueryExpression expression, + VariableContext<Target> context, + Callback<Target> callback, + Callback<Target> errorReporter) { + return ParallelSkyQueryUtils.getDepsUnboundedParallel( + SkyQueryEnvironment.this, + expression, + context, + callback, + packageSemaphore, + /*depsNeedFiltering=*/ !dependencyFilter.equals(DependencyFilter.ALL_DEPS), + errorReporter); + } + @ThreadSafe @Override public QueryTaskFuture<Void> getRdepsBoundedParallel( diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java index de4cd341dd..803c975d90 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java @@ -58,31 +58,46 @@ final class DepsFunction implements QueryFunction { final QueryExpression expression, List<Argument> args, final Callback<T> callback) { + QueryExpression queryExpression = args.get(0).getExpression(); + if (env instanceof StreamableQueryEnvironment && args.size() == 1) { + StreamableQueryEnvironment<T> streamableEnv = (StreamableQueryEnvironment<T>) env; + return streamableEnv.getDepsUnboundedParallel( + queryExpression, + context, + callback, + targets -> { + ThreadSafeMutableSet<T> set = env.createThreadSafeMutableSet(); + Iterables.addAll(set, targets); + env.buildTransitiveClosure(expression, set, /*maxDepth=*/ 1); + }); + } + final int depthBound = args.size() > 1 ? args.get(1).getInteger() : Integer.MAX_VALUE; final MinDepthUniquifier<T> minDepthUniquifier = env.createMinDepthUniquifier(); - return env.eval(args.get(0).getExpression(), context, new Callback<T>() { - @Override - public void process(Iterable<T> partialResult) throws QueryException, InterruptedException { - ThreadSafeMutableSet<T> current = env.createThreadSafeMutableSet(); - Iterables.addAll(current, partialResult); - env.buildTransitiveClosure(expression, current, depthBound); + return env.eval( + queryExpression, + context, + partialResult -> { + ThreadSafeMutableSet<T> current = env.createThreadSafeMutableSet(); + Iterables.addAll(current, partialResult); + env.buildTransitiveClosure(expression, current, depthBound); - // We need to iterate depthBound + 1 times. - for (int i = 0; i <= depthBound; i++) { - // Filter already visited nodes: if we see a node in a later round, then we don't need to - // visit it again, because the depth at which we see it at must be greater than or equal - // to the last visit. - ImmutableList<T> toProcess = - minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(current, i); - callback.process(toProcess); - current = env.createThreadSafeMutableSet(); - Iterables.addAll(current, env.getFwdDeps(toProcess)); - if (current.isEmpty()) { - // Exit when there are no more nodes to visit. - break; + // We need to iterate depthBound + 1 times. + for (int i = 0; i <= depthBound; i++) { + // Filter already visited nodes: if we see a node in a later round, then we don't need + // to + // visit it again, because the depth at which we see it at must be greater than or equal + // to the last visit. + ImmutableList<T> toProcess = + minDepthUniquifier.uniqueAtDepthLessThanOrEqualTo(current, i); + callback.process(toProcess); + current = env.createThreadSafeMutableSet(); + Iterables.addAll(current, env.getFwdDeps(toProcess)); + if (current.isEmpty()) { + // Exit when there are no more nodes to visit. + break; + } } - } - } - }); + }); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java index ef34742e58..f36bee0063 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/StreamableQueryEnvironment.java @@ -42,4 +42,10 @@ public interface StreamableQueryEnvironment<T> extends QueryEnvironment<T> { QueryExpression universe, VariableContext<T> context, Callback<T> callback); + + QueryTaskFuture<Void> getDepsUnboundedParallel( + QueryExpression expression, + VariableContext<T> context, + Callback<T> callback, + Callback<T> errorReporter); } |