// Copyright 2015 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.devtools.build.lib.query2; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import com.google.common.base.Ascii; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.base.Throwables; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Collections2; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import com.google.common.collect.Streams; import com.google.common.util.concurrent.AsyncCallable; import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.devtools.build.lib.actions.FileStateValue; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.cmdline.TargetParsingException; import com.google.devtools.build.lib.cmdline.TargetPattern; import com.google.devtools.build.lib.collect.compacthashset.CompactHashSet; import com.google.devtools.build.lib.concurrent.BlockingStack; import com.google.devtools.build.lib.concurrent.MultisetSemaphore; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.events.DelegatingEventHandler; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.EventKind; import com.google.devtools.build.lib.events.ExtendedEventHandler; import com.google.devtools.build.lib.packages.BuildFileContainsErrorsException; import com.google.devtools.build.lib.packages.DependencyFilter; import com.google.devtools.build.lib.packages.NoSuchPackageException; import com.google.devtools.build.lib.packages.NoSuchTargetException; import com.google.devtools.build.lib.packages.NoSuchThingException; import com.google.devtools.build.lib.packages.Package; import com.google.devtools.build.lib.packages.Rule; import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.pkgcache.PathPackageLocator; import com.google.devtools.build.lib.pkgcache.TargetPatternEvaluator; import com.google.devtools.build.lib.profiler.AutoProfiler; import com.google.devtools.build.lib.query2.engine.AllRdepsFunction; import com.google.devtools.build.lib.query2.engine.Callback; import com.google.devtools.build.lib.query2.engine.KeyExtractor; import com.google.devtools.build.lib.query2.engine.MinDepthUniquifier; import com.google.devtools.build.lib.query2.engine.OutputFormatterCallback; import com.google.devtools.build.lib.query2.engine.QueryEvalResult; import com.google.devtools.build.lib.query2.engine.QueryException; import com.google.devtools.build.lib.query2.engine.QueryExpression; import com.google.devtools.build.lib.query2.engine.QueryExpressionContext; import com.google.devtools.build.lib.query2.engine.QueryExpressionMapper; import com.google.devtools.build.lib.query2.engine.QueryUtil.MinDepthUniquifierImpl; import com.google.devtools.build.lib.query2.engine.QueryUtil.MutableKeyExtractorBackedMapImpl; import com.google.devtools.build.lib.query2.engine.QueryUtil.ThreadSafeMutableKeyExtractorBackedSetImpl; import com.google.devtools.build.lib.query2.engine.QueryUtil.UniquifierImpl; import com.google.devtools.build.lib.query2.engine.StreamableQueryEnvironment; import com.google.devtools.build.lib.query2.engine.ThreadSafeOutputFormatterCallback; import com.google.devtools.build.lib.query2.engine.Uniquifier; import com.google.devtools.build.lib.skyframe.BlacklistedPackagePrefixesValue; import com.google.devtools.build.lib.skyframe.ContainingPackageLookupFunction; import com.google.devtools.build.lib.skyframe.GraphBackedRecursivePackageProvider; import com.google.devtools.build.lib.skyframe.PackageLookupValue; import com.google.devtools.build.lib.skyframe.PackageValue; import com.google.devtools.build.lib.skyframe.PrepareDepsOfPatternsFunction; import com.google.devtools.build.lib.skyframe.RecursivePackageProviderBackedTargetPatternResolver; import com.google.devtools.build.lib.skyframe.TargetPatternValue; import com.google.devtools.build.lib.skyframe.TargetPatternValue.TargetPatternKey; import com.google.devtools.build.lib.skyframe.TransitiveTraversalValue; import com.google.devtools.build.lib.skyframe.TraversalInfoRootPackageExtractor; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.lib.vfs.RootedPath; import com.google.devtools.build.skyframe.EvaluationResult; import com.google.devtools.build.skyframe.InterruptibleSupplier; import com.google.devtools.build.skyframe.SkyFunctionName; import com.google.devtools.build.skyframe.SkyKey; import com.google.devtools.build.skyframe.SkyValue; import com.google.devtools.build.skyframe.WalkableGraph; import com.google.devtools.build.skyframe.WalkableGraph.WalkableGraphFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; /** * {@link AbstractBlazeQueryEnvironment} that introspects the Skyframe graph to find forward and * reverse edges. Results obtained by calling {@link #evaluateQuery} are not guaranteed to be in any * particular order. As well, this class eagerly loads the full transitive closure of targets, even * if the full closure isn't needed. * *

This class has concurrent implementations of the * {@link QueryTaskFuture}/{@link QueryTaskCallable} helper methods. The combination of this and the * asynchronous evaluation model yields parallel query evaluation. */ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment implements StreamableQueryEnvironment { // 10k is likely a good balance between using batch efficiently and not blowing up memory. // TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant. protected static final int BATCH_CALLBACK_SIZE = 10000; protected static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors(); private static final int MAX_QUERY_EXPRESSION_LOG_CHARS = 1000; private static final Logger logger = Logger.getLogger(SkyQueryEnvironment.class.getName()); private final BlazeTargetAccessor accessor = new BlazeTargetAccessor(this); protected final int loadingPhaseThreads; protected final WalkableGraphFactory graphFactory; protected final ImmutableList universeScope; protected boolean blockUniverseEvaluationErrors; protected ExtendedEventHandler universeEvalEventHandler; protected final String parserPrefix; protected final PathPackageLocator pkgPath; private final int queryEvaluationParallelismLevel; // The following fields are set in the #beforeEvaluateQuery method. private MultisetSemaphore packageSemaphore; protected WalkableGraph graph; private InterruptibleSupplier> blacklistPatternsSupplier; private GraphBackedRecursivePackageProvider graphBackedRecursivePackageProvider; private ListeningExecutorService executor; private RecursivePackageProviderBackedTargetPatternResolver resolver; protected final SkyKey universeKey; private final ImmutableList universeTargetPatternKeys; public SkyQueryEnvironment( boolean keepGoing, int loadingPhaseThreads, ExtendedEventHandler eventHandler, Set settings, Iterable extraFunctions, String parserPrefix, WalkableGraphFactory graphFactory, List universeScope, PathPackageLocator pkgPath, boolean blockUniverseEvaluationErrors) { this( keepGoing, loadingPhaseThreads, // SkyQueryEnvironment operates on a prepopulated Skyframe graph. Therefore, query // evaluation is completely CPU-bound. /*queryEvaluationParallelismLevel=*/ DEFAULT_THREAD_COUNT, eventHandler, settings, extraFunctions, parserPrefix, graphFactory, universeScope, pkgPath, blockUniverseEvaluationErrors); } protected SkyQueryEnvironment( boolean keepGoing, int loadingPhaseThreads, int queryEvaluationParallelismLevel, ExtendedEventHandler eventHandler, Set settings, Iterable extraFunctions, String parserPrefix, WalkableGraphFactory graphFactory, List universeScope, PathPackageLocator pkgPath, boolean blockUniverseEvaluationErrors) { super( keepGoing, /*strictScope=*/ true, /*labelFilter=*/ Rule.ALL_LABELS, eventHandler, settings, extraFunctions); this.loadingPhaseThreads = loadingPhaseThreads; this.graphFactory = graphFactory; this.pkgPath = pkgPath; this.universeScope = ImmutableList.copyOf(Preconditions.checkNotNull(universeScope)); this.parserPrefix = parserPrefix; Preconditions.checkState( !universeScope.isEmpty(), "No queries can be performed with an empty universe"); this.queryEvaluationParallelismLevel = queryEvaluationParallelismLevel; this.universeKey = graphFactory.getUniverseKey(universeScope, parserPrefix); this.blockUniverseEvaluationErrors = blockUniverseEvaluationErrors; this.universeEvalEventHandler = this.blockUniverseEvaluationErrors ? new ErrorBlockingForwardingEventHandler(this.eventHandler) : this.eventHandler; this.universeTargetPatternKeys = PrepareDepsOfPatternsFunction.getTargetPatternKeys( PrepareDepsOfPatternsFunction.getSkyKeys(universeKey, eventHandler)); } @Override public void close() { if (executor != null) { executor.shutdownNow(); executor = null; } } /** Gets roots of graph which contains all nodes needed to evaluate {@code expr}. */ protected Set getGraphRootsFromExpression(QueryExpression expr) throws QueryException, InterruptedException { return ImmutableSet.of(universeKey); } private void beforeEvaluateQuery(QueryExpression expr) throws QueryException, InterruptedException { Set roots = getGraphRootsFromExpression(expr); EvaluationResult result; try (AutoProfiler p = AutoProfiler.logged("evaluation and walkable graph", logger)) { result = graphFactory.prepareAndGet(roots, loadingPhaseThreads, universeEvalEventHandler); } if (graph == null || graph != result.getWalkableGraph()) { checkEvaluationResult(roots, result); packageSemaphore = makeFreshPackageMultisetSemaphore(); graph = result.getWalkableGraph(); blacklistPatternsSupplier = InterruptibleSupplier.Memoize.of(new BlacklistSupplier(graph)); graphBackedRecursivePackageProvider = new GraphBackedRecursivePackageProvider( graph, universeTargetPatternKeys, pkgPath, new TraversalInfoRootPackageExtractor()); } if (executor == null) { executor = MoreExecutors.listeningDecorator( new ThreadPoolExecutor( /*corePoolSize=*/ queryEvaluationParallelismLevel, /*maximumPoolSize=*/ queryEvaluationParallelismLevel, /*keepAliveTime=*/ 1, /*units=*/ TimeUnit.SECONDS, /*workQueue=*/ new BlockingStack(), new ThreadFactoryBuilder().setNameFormat("QueryEnvironment %d").build())); } resolver = new RecursivePackageProviderBackedTargetPatternResolver( graphBackedRecursivePackageProvider, eventHandler, TargetPatternEvaluator.DEFAULT_FILTERING_POLICY, packageSemaphore); } protected MultisetSemaphore makeFreshPackageMultisetSemaphore() { return MultisetSemaphore.unbounded(); } @ThreadSafe public MultisetSemaphore getPackageMultisetSemaphore() { return packageSemaphore; } protected void checkEvaluationResult(Set roots, EvaluationResult result) throws QueryException { // If the only root is the universe key, we expect to see either a single successfully evaluated // value or a cycle in the result. if (roots.size() == 1 && Iterables.getOnlyElement(roots).equals(universeKey)) { Collection values = result.values(); if (!values.isEmpty()) { Preconditions.checkState( values.size() == 1, "Universe query \"%s\" returned multiple values unexpectedly (%s values in result)", universeScope, values.size()); Preconditions.checkNotNull(result.get(universeKey), result); } else { // No values in the result, so there must be an error. We expect the error to be a cycle. boolean foundCycle = !Iterables.isEmpty(result.getError().getCycleInfo()); Preconditions.checkState( foundCycle, "Universe query \"%s\" failed with non-cycle error: %s", universeScope, result.getError()); } } } @Override public final QueryExpression transformParsedQuery(QueryExpression queryExpression) { QueryExpressionMapper mapper = getQueryExpressionMapper(); QueryExpression transformedQueryExpression = queryExpression.accept(mapper); logger.info( String.format( "transformed query [%s] to [%s]", Ascii.truncate( queryExpression.toString(), MAX_QUERY_EXPRESSION_LOG_CHARS, "[truncated]"), Ascii.truncate( transformedQueryExpression.toString(), MAX_QUERY_EXPRESSION_LOG_CHARS, "[truncated]"))); return transformedQueryExpression; } protected QueryExpressionMapper getQueryExpressionMapper() { if (universeScope.size() != 1) { return QueryExpressionMapper.identity(); } TargetPattern.Parser targetPatternParser = new TargetPattern.Parser(parserPrefix); String universeScopePattern = Iterables.getOnlyElement(universeScope); return new RdepsToAllRdepsQueryExpressionMapper(targetPatternParser, universeScopePattern); } @Override protected void evalTopLevelInternal( QueryExpression expr, OutputFormatterCallback callback) throws QueryException, InterruptedException { Throwable throwableToThrow = null; try { super.evalTopLevelInternal(expr, callback); } catch (Throwable throwable) { throwableToThrow = throwable; } finally { if (throwableToThrow != null) { logger.log( Level.INFO, "About to shutdown query threadpool because of throwable", throwableToThrow); // Force termination of remaining tasks if evaluation failed abruptly (e.g. was // interrupted). We don't want to leave any dangling threads running tasks. executor.shutdownNow(); executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS); // Signal that executor must be recreated on the next invocation. executor = null; Throwables.propagateIfPossible( throwableToThrow, QueryException.class, InterruptedException.class); } } } @Override public QueryEvalResult evaluateQuery( QueryExpression expr, ThreadSafeOutputFormatterCallback callback) throws QueryException, InterruptedException, IOException { beforeEvaluateQuery(expr); // SkyQueryEnvironment batches callback invocations using a BatchStreamedCallback, created here // so that there's one per top-level evaluateQuery call. The batch size is large enough that // per-call costs of calling the original callback are amortized over a good number of targets, // and small enough that holding a batch of targets in memory doesn't risk an OOM error. // // This flushes the batched callback prior to constructing the QueryEvalResult in the unlikely // case of a race between the original callback and the eventHandler. BatchStreamedCallback batchCallback = new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE); return super.evaluateQuery(expr, batchCallback); } private Map> targetifyValues( Map> input) throws InterruptedException { return targetifyValues( input, makePackageKeyToTargetKeyMap(ImmutableSet.copyOf(Iterables.concat(input.values())))); } private Map> targetifyValues( Map> input, Multimap packageKeyToTargetKeyMap) throws InterruptedException { ImmutableMap.Builder> result = ImmutableMap.builder(); Map allTargets = makeTargetsFromPackageKeyToTargetKeyMap(packageKeyToTargetKeyMap); for (Map.Entry> entry : input.entrySet()) { Iterable skyKeys = entry.getValue(); Set targets = CompactHashSet.createWithExpectedSize(Iterables.size(skyKeys)); for (SkyKey key : skyKeys) { Target target = allTargets.get(key); if (target != null) { targets.add(target); } } result.put(entry.getKey(), targets); } return result.build(); } private Map> getRawReverseDeps( Iterable transitiveTraversalKeys) throws InterruptedException { return targetifyValues(graph.getReverseDeps(transitiveTraversalKeys)); } private Set

The implementation of this method does not filter out deps due to disallowed edges, * therefore callers are responsible for doing the right thing themselves. */ public Multimap getUnfilteredDirectDepsOfSkyKeys(Iterable keys) throws InterruptedException { ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); graph.getDirectDeps(keys).forEach(builder::putAll); return builder.build(); } @Override public Collection getReverseDeps( Iterable targets, QueryExpressionContext context) throws InterruptedException { return getReverseDepsOfTransitiveTraversalKeys(Iterables.transform(targets, TARGET_TO_SKY_KEY)); } private Collection getReverseDepsOfTransitiveTraversalKeys( Iterable transitiveTraversalKeys) throws InterruptedException { Map> rawReverseDeps = getRawReverseDeps(transitiveTraversalKeys); return processRawReverseDeps(rawReverseDeps); } /** Targetify SkyKeys of reverse deps and filter out targets whose deps are not allowed. */ Collection filterRawReverseDepsOfTransitiveTraversalKeys( Map> rawReverseDeps, Multimap packageKeyToTargetKeyMap) throws InterruptedException { return processRawReverseDeps(targetifyValues(rawReverseDeps, packageKeyToTargetKeyMap)); } private Collection processRawReverseDeps(Map> rawReverseDeps) throws InterruptedException { Set result = CompactHashSet.create(); CompactHashSet visited = CompactHashSet.createWithExpectedSize(totalSizeOfCollections(rawReverseDeps.values())); Set

This is a helper function for {@link #getFileStateKeysForFileFragments}. */ private static Iterable getPkgLookupKeysForFile(PathFragment originalFileFragment, PathFragment currentPathFragment) { if (originalFileFragment.equals(currentPathFragment) && originalFileFragment.equals(Label.WORKSPACE_FILE_NAME)) { // TODO(mschaller): this should not be checked at runtime. These are constants! Preconditions.checkState( Label.WORKSPACE_FILE_NAME.getParentDirectory().equals(PathFragment.EMPTY_FRAGMENT), Label.WORKSPACE_FILE_NAME); return ImmutableList.of( PackageLookupValue.key(Label.EXTERNAL_PACKAGE_IDENTIFIER), PackageLookupValue.key(PackageIdentifier.createInMainRepo(PathFragment.EMPTY_FRAGMENT))); } PathFragment parentPathFragment = currentPathFragment.getParentDirectory(); return parentPathFragment == null ? ImmutableList.of() : ImmutableList.of( PackageLookupValue.key(PackageIdentifier.createInMainRepo(parentPathFragment))); } /** * Returns FileStateValue keys for which there may be relevant (from the perspective of {@link * #getRBuildFiles}) FileStateValues in the graph corresponding to the given * {@code pathFragments}, which are assumed to be file paths. * *

To do this, we emulate the {@link ContainingPackageLookupFunction} logic: for each given * file path, we look for the nearest ancestor directory (starting with its parent directory), if * any, that has a package. The {@link PackageLookupValue} for this package tells us the package * root that we should use for the {@link RootedPath} for the {@link FileStateValue} key. * *

Note that there may not be nodes in the graph corresponding to the returned SkyKeys. */ Collection getFileStateKeysForFileFragments(Iterable pathFragments) throws InterruptedException { Set result = new HashSet<>(); Multimap currentToOriginal = ArrayListMultimap.create(); for (PathFragment pathFragment : pathFragments) { currentToOriginal.put(pathFragment, pathFragment); } while (!currentToOriginal.isEmpty()) { Multimap packageLookupKeysToOriginal = ArrayListMultimap.create(); Multimap packageLookupKeysToCurrent = ArrayListMultimap.create(); for (Map.Entry entry : currentToOriginal.entries()) { PathFragment current = entry.getKey(); PathFragment original = entry.getValue(); for (SkyKey packageLookupKey : getPkgLookupKeysForFile(original, current)) { packageLookupKeysToOriginal.put(packageLookupKey, original); packageLookupKeysToCurrent.put(packageLookupKey, current); } } Map lookupValues = graph.getSuccessfulValues(packageLookupKeysToOriginal.keySet()); for (Map.Entry entry : lookupValues.entrySet()) { SkyKey packageLookupKey = entry.getKey(); PackageLookupValue packageLookupValue = (PackageLookupValue) entry.getValue(); if (packageLookupValue.packageExists()) { Collection originalFiles = packageLookupKeysToOriginal.get(packageLookupKey); Preconditions.checkState(!originalFiles.isEmpty(), entry); for (PathFragment fileName : originalFiles) { result.add(FileStateValue.key( RootedPath.toRootedPath(packageLookupValue.getRoot(), fileName))); } for (PathFragment current : packageLookupKeysToCurrent.get(packageLookupKey)) { currentToOriginal.removeAll(current); } } } Multimap newCurrentToOriginal = ArrayListMultimap.create(); for (PathFragment pathFragment : currentToOriginal.keySet()) { PathFragment parent = pathFragment.getParentDirectory(); if (parent != null) { newCurrentToOriginal.putAll(parent, currentToOriginal.get(pathFragment)); } } currentToOriginal = newCurrentToOriginal; } return result; } protected void getBuildFileTargetsForPackageKeysAndProcessViaCallback( Iterable packageKeys, Callback callback) throws QueryException, InterruptedException { Set pkgIds = Streams.stream(packageKeys) .map(SkyQueryEnvironment.PACKAGE_SKYKEY_TO_PACKAGE_IDENTIFIER) .collect(toImmutableSet()); packageSemaphore.acquireAll(pkgIds); try { Iterable buildFileTargets = Iterables.transform( graph.getSuccessfulValues(packageKeys).values(), skyValue -> ((PackageValue) skyValue).getPackage().getBuildFile()); callback.process(buildFileTargets); } finally { packageSemaphore.releaseAll(pkgIds); } } /** * Calculates the set of packages that transitively depend on, via load statements, the specified * paths. The emitted {@link Target}s are BUILD file targets. */ @ThreadSafe QueryTaskFuture getRBuildFiles( Collection fileIdentifiers, Callback callback) { return QueryTaskFutureImpl.ofDelegate( safeSubmit( () -> { ParallelSkyQueryUtils.getRBuildFilesParallel( SkyQueryEnvironment.this, fileIdentifiers, callback); return null; })); } @Override public Iterable getFunctions() { return ImmutableList.builder() .addAll(super.getFunctions()) .add(new AllRdepsFunction()) .add(new RBuildFilesFunction()) .build(); } private static class BlacklistSupplier implements InterruptibleSupplier> { private final WalkableGraph graph; private BlacklistSupplier(WalkableGraph graph) { this.graph = graph; } @Override public ImmutableSet get() throws InterruptedException { return ((BlacklistedPackagePrefixesValue) graph.getValue(BlacklistedPackagePrefixesValue.key())) .getPatterns(); } } private static class SkyKeyKeyExtractor implements KeyExtractor { private static final SkyKeyKeyExtractor INSTANCE = new SkyKeyKeyExtractor(); private SkyKeyKeyExtractor() { } @Override public SkyKey extractKey(SkyKey element) { return element; } } /** * Wraps a {@link Callback} and guarantees that all calls to the original will have at least * {@code batchThreshold} {@link Target}s, except for the final such call. * *

Retains fewer than {@code batchThreshold} {@link Target}s at a time. * *

After this object's {@link #process} has been called for the last time, {#link * #processLastPending} must be called to "flush" any remaining {@link Target}s through to the * original. * *

This callback may be called from multiple threads concurrently. At most one thread will call * the wrapped {@code callback} concurrently. */ // TODO(nharmata): For queries with less than {@code batchThreshold} results, this batching // strategy probably hurts performance since we can only start formatting results once the entire // query is finished. // TODO(nharmata): This batching strategy is also potentially harmful from a memory perspective // since when the Targets being output are backed by Package instances, we're delaying GC of the // Package instances until the output batch size is met. private static class BatchStreamedCallback extends ThreadSafeOutputFormatterCallback implements Callback { // TODO(nharmata): Now that we know the wrapped callback is ThreadSafe, there's no correctness // concern that requires the prohibition of concurrent uses of the callback; the only concern is // memory. We should have a threshold for when to invoke the callback with a batch, and also a // separate, larger, bound on the number of targets being processed at the same time. private final ThreadSafeOutputFormatterCallback callback; private final UniquifierImpl uniquifier = new UniquifierImpl<>(TargetKeyExtractor.INSTANCE); private final Object pendingLock = new Object(); private List pending = new ArrayList<>(); private int batchThreshold; private BatchStreamedCallback( ThreadSafeOutputFormatterCallback callback, int batchThreshold) { this.callback = callback; this.batchThreshold = batchThreshold; } @Override public void start() throws IOException { callback.start(); } @Override public void processOutput(Iterable partialResult) throws IOException, InterruptedException { ImmutableList uniquifiedTargets = uniquifier.unique(partialResult); synchronized (pendingLock) { Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed"); pending.addAll(uniquifiedTargets); if (pending.size() >= batchThreshold) { callback.processOutput(pending); pending = new ArrayList<>(); } } } @Override public void close(boolean failFast) throws IOException, InterruptedException { if (!failFast) { processLastPending(); } callback.close(failFast); } private void processLastPending() throws IOException, InterruptedException { synchronized (pendingLock) { if (!pending.isEmpty()) { callback.processOutput(pending); pending = null; } } } } @ThreadSafe @Override public QueryTaskFuture getAllRdepsUnboundedParallel( QueryExpression expression, QueryExpressionContext context, Callback callback) { return ParallelSkyQueryUtils.getAllRdepsUnboundedParallel( this, expression, context, callback, packageSemaphore); } @ThreadSafe @Override public QueryTaskFuture getAllRdepsBoundedParallel( QueryExpression expression, int depth, QueryExpressionContext context, Callback callback) { return ParallelSkyQueryUtils.getAllRdepsBoundedParallel( this, expression, depth, context, callback, packageSemaphore); } protected QueryTaskFuture> getUnfilteredUniverseDTCSkyKeyPredicateFuture( QueryExpression universe, QueryExpressionContext context) { return ParallelSkyQueryUtils.getDTCSkyKeyPredicateFuture( this, universe, context, BATCH_CALLBACK_SIZE, DEFAULT_THREAD_COUNT); } @ThreadSafe @Override public QueryTaskFuture getRdepsUnboundedParallel( QueryExpression expression, QueryExpression universe, QueryExpressionContext context, Callback callback) { return transformAsync( // Even if we need to do edge filtering, it's fine to construct the rdeps universe via an // unfiltered DTC visitation; the subsequent rdeps visitation will perform the edge // filtering. getUnfilteredUniverseDTCSkyKeyPredicateFuture(universe, context), unfilteredUniversePredicate -> ParallelSkyQueryUtils.getRdepsInUniverseUnboundedParallel( this, expression, unfilteredUniversePredicate, context, callback, packageSemaphore)); } @Override public QueryTaskFuture getDepsUnboundedParallel( QueryExpression expression, QueryExpressionContext context, Callback callback) { return ParallelSkyQueryUtils.getDepsUnboundedParallel( SkyQueryEnvironment.this, expression, context, callback, packageSemaphore, /*depsNeedFiltering=*/ !dependencyFilter.equals(DependencyFilter.ALL_DEPS)); } @ThreadSafe @Override public QueryTaskFuture getRdepsBoundedParallel( QueryExpression expression, int depth, QueryExpression universe, QueryExpressionContext context, Callback callback) { return transformAsync( // Even if we need to do edge filtering, it's fine to construct the rdeps universe via an // unfiltered DTC visitation; the subsequent rdeps visitation will perform the edge // filtering. getUnfilteredUniverseDTCSkyKeyPredicateFuture(universe, context), universePredicate -> ParallelSkyQueryUtils.getRdepsInUniverseBoundedParallel( this, expression, depth, universePredicate, context, callback, packageSemaphore)); } /** * Query evaluation behavior is specified with respect to errors it emits. (Or at least it should * be. Tools rely on it.) Notably, errors that occur during evaluation of a query's universe must * not be emitted during query command evaluation. Consider the case of a simple single target * query when {@code //...} is the universe: errors in far flung parts of the workspace should not * be emitted when that query command is evaluated. * *

Non-error message events are not specified. For instance, it's useful (and expected by some * unit tests that should know better) for query commands to emit {@link EventKind#PROGRESS} * events during package loading. * *

Therefore, this class is used to forward only non-{@link EventKind#ERROR} events during * universe loading to the {@link SkyQueryEnvironment}'s {@link ExtendedEventHandler}. */ protected static class ErrorBlockingForwardingEventHandler extends DelegatingEventHandler { public ErrorBlockingForwardingEventHandler(ExtendedEventHandler delegate) { super(delegate); } @Override public void handle(Event e) { if (!e.getKind().equals(EventKind.ERROR)) { super.handle(e); } } } }