// Copyright 2016 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.devtools.build.lib.query2; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.concurrent.MultisetSemaphore; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.query2.engine.Callback; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ThreadSafeMutableSet; import com.google.devtools.build.lib.query2.engine.QueryException; import com.google.devtools.build.lib.query2.engine.QueryExpression; import com.google.devtools.build.lib.query2.engine.QueryExpressionContext; import com.google.devtools.build.lib.query2.engine.QueryUtil; import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback; import com.google.devtools.build.lib.query2.engine.Uniquifier; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.skyframe.SkyKey; import java.util.Collection; import java.util.Collections; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; /** * Parallel implementations of various functionality in {@link SkyQueryEnvironment}. * *

Special attention is given to memory usage. Naive parallel implementations of query * functionality would lead to memory blowup. Instead of dealing with {@link Target}s, we try to * deal with {@link SkyKey}s as much as possible to reduce the number of {@link Package}s forcibly * in memory at any given time. */ // TODO(bazel-team): Be more deliberate about bounding memory usage here. public class ParallelSkyQueryUtils { /** The maximum number of keys to visit at once. */ @VisibleForTesting static final int VISIT_BATCH_SIZE = 10000; private ParallelSkyQueryUtils() { } static QueryTaskFuture getAllRdepsUnboundedParallel( SkyQueryEnvironment env, QueryExpression expression, QueryExpressionContext context, Callback callback, MultisetSemaphore packageSemaphore) { return env.eval( expression, context, ParallelVisitor.createParallelVisitorCallback( new RdepsUnboundedVisitor.Factory( env, /*unfilteredUniverse=*/ Predicates.alwaysTrue(), callback, packageSemaphore))); } static QueryTaskFuture getAllRdepsBoundedParallel( SkyQueryEnvironment env, QueryExpression expression, int depth, QueryExpressionContext context, Callback callback, MultisetSemaphore packageSemaphore) { return env.eval( expression, context, ParallelVisitor.createParallelVisitorCallback( new RdepsBoundedVisitor.Factory( env, depth, /*universe=*/ Predicates.alwaysTrue(), callback, packageSemaphore))); } static QueryTaskFuture getRdepsInUniverseUnboundedParallel( SkyQueryEnvironment env, QueryExpression expression, Predicate unfilteredUniverse, QueryExpressionContext context, Callback callback, MultisetSemaphore packageSemaphore) { return env.eval( expression, context, ParallelVisitor.createParallelVisitorCallback( new RdepsUnboundedVisitor.Factory( env, unfilteredUniverse, callback, packageSemaphore))); } static QueryTaskFuture> getDTCSkyKeyPredicateFuture( SkyQueryEnvironment env, QueryExpression expression, QueryExpressionContext context, int processResultsBatchSize, int concurrencyLevel) { QueryTaskFuture> universeValueFuture = QueryUtil.evalAll(env, context, expression); Function, QueryTaskFuture>> getTransitiveClosureAsyncFunction = universeValue -> { ThreadSafeAggregateAllSkyKeysCallback aggregateAllCallback = new ThreadSafeAggregateAllSkyKeysCallback(concurrencyLevel); return env.executeAsync( () -> { Callback visitorCallback = ParallelVisitor.createParallelVisitorCallback( new UnfilteredSkyKeyTTVDTCVisitor.Factory( env, env.createSkyKeyUniquifier(), processResultsBatchSize, aggregateAllCallback)); visitorCallback.process(universeValue); return Predicates.in(aggregateAllCallback.getResult()); }); }; return env.transformAsync(universeValueFuture, getTransitiveClosureAsyncFunction); } static QueryTaskFuture getRdepsInUniverseBoundedParallel( SkyQueryEnvironment env, QueryExpression expression, int depth, Predicate universe, QueryExpressionContext context, Callback callback, MultisetSemaphore packageSemaphore) { return env.eval( expression, context, ParallelVisitor.createParallelVisitorCallback( new RdepsBoundedVisitor.Factory( env, depth, universe, callback, packageSemaphore))); } /** Specialized parallel variant of {@link SkyQueryEnvironment#getRBuildFiles}. */ static void getRBuildFilesParallel( SkyQueryEnvironment env, Collection fileIdentifiers, Callback callback) throws QueryException, InterruptedException { Uniquifier keyUniquifier = env.createSkyKeyUniquifier(); RBuildFilesVisitor visitor = new RBuildFilesVisitor(env, keyUniquifier, callback); visitor.visitAndWaitForCompletion(env.getFileStateKeysForFileFragments(fileIdentifiers)); } static QueryTaskFuture getDepsUnboundedParallel( SkyQueryEnvironment env, QueryExpression expression, QueryExpressionContext context, Callback callback, MultisetSemaphore packageSemaphore, boolean depsNeedFiltering) { return env.eval( expression, context, ParallelVisitor.createParallelVisitorCallback( new DepsUnboundedVisitor.Factory( env, callback, packageSemaphore, depsNeedFiltering, context))); } static class DepAndRdep { @Nullable final SkyKey dep; final SkyKey rdep; DepAndRdep(@Nullable SkyKey dep, SkyKey rdep) { this.dep = dep; this.rdep = rdep; } @Override public boolean equals(Object obj) { if (!(obj instanceof DepAndRdep)) { return false; } DepAndRdep other = (DepAndRdep) obj; return Objects.equals(dep, other.dep) && rdep.equals(other.rdep); } @Override public int hashCode() { // N.B. - We deliberately use a garbage-free hashCode implementation (rather than e.g. // Objects#hash). Depending on the structure of the graph being traversed, this method can // be very hot. return 31 * Objects.hashCode(dep) + rdep.hashCode(); } } static class DepAndRdepAtDepth { final DepAndRdep depAndRdep; final int rdepDepth; DepAndRdepAtDepth(DepAndRdep depAndRdep, int rdepDepth) { this.depAndRdep = depAndRdep; this.rdepDepth = rdepDepth; } } /** Thread-safe {@link AggregateAllCallback} backed by a concurrent {@link Set}. */ @ThreadSafe private static class ThreadSafeAggregateAllSkyKeysCallback implements AggregateAllCallback> { private final Set results; private ThreadSafeAggregateAllSkyKeysCallback(int concurrencyLevel) { this.results = Collections.newSetFromMap( new ConcurrentHashMap<>( /*initialCapacity=*/ concurrencyLevel, /*loadFactor=*/ 0.75f)); } @Override public void process(Iterable partialResult) throws QueryException, InterruptedException { Iterables.addAll(results, partialResult); } @Override public ImmutableSet getResult() { return ImmutableSet.copyOf(results); } } }