diff options
author | Han-Wen Nienhuys <hanwen@google.com> | 2015-02-25 16:45:20 +0100 |
---|---|---|
committer | Han-Wen Nienhuys <hanwen@google.com> | 2015-02-25 16:45:20 +0100 |
commit | d08b27fa9701fecfdb69e1b0d1ac2459efc2129b (patch) | |
tree | 5d50963026239ca5aebfb47ea5b8db7e814e57c8 /src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java |
Update from Google.
--
MOE_MIGRATED_REVID=85702957
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java | 481 |
1 files changed, 481 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java new file mode 100644 index 0000000000..b72e3aac52 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java @@ -0,0 +1,481 @@ +// Copyright 2014 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.query2; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.MapMaker; +import com.google.common.collect.Multimaps; +import com.google.common.collect.SetMultimap; +import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; +import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; +import com.google.devtools.build.lib.events.EventHandler; +import com.google.devtools.build.lib.packages.AggregatingAttributeMapper; +import com.google.devtools.build.lib.packages.Attribute; +import com.google.devtools.build.lib.packages.AttributeMap; +import com.google.devtools.build.lib.packages.InputFile; +import com.google.devtools.build.lib.packages.NoSuchThingException; +import com.google.devtools.build.lib.packages.OutputFile; +import com.google.devtools.build.lib.packages.Package; +import com.google.devtools.build.lib.packages.PackageGroup; +import com.google.devtools.build.lib.packages.Rule; +import com.google.devtools.build.lib.packages.Target; +import com.google.devtools.build.lib.pkgcache.PackageProvider; +import com.google.devtools.build.lib.pkgcache.TargetEdgeObserver; +import com.google.devtools.build.lib.syntax.Label; +import com.google.devtools.build.lib.util.BinaryPredicate; + +import java.util.Collection; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * <p>Visit the transitive closure of a label. Primarily used to "fault in" + * packages to the packageProvider and ensure the necessary targets exists, in + * advance of the configuration step, which is intolerant of missing + * packages/targets. + * + * <p>LabelVisitor loads packages concurrently where possible, to increase I/O + * parallelism. However, the public interface is not thread-safe: calls to + * public methods should not be made concurrently. + * + * <p>LabelVisitor is stateful: It remembers the previous visitation and can + * check its validity on subsequent calls to sync() instead of doing the normal + * visitation. + * + * <p>TODO(bazel-team): (2009) a small further optimization could be achieved if we + * create tasks at the package (not individual label) level, since package + * loading is the expensive step. This would require additional bookkeeping to + * maintain the list of labels that we need to visit once a package becomes + * available. Profiling suggests that there is still a potential benefit to be + * gained: when the set of packages is known a-priori, loading a set of packages + * that took 20 seconds can be done under 5 in the sequential case or 7 in the + * current (parallel) case. + * + * <h4>Concurrency</h4> + * + * <p>The sync() methods of this class is thread-compatible. The accessor + * ({@link #hasVisited} and similar must not be called until the concurrent phase + * is over, i.e. all external calls to visit() methods have completed. + */ +final class LabelVisitor { + + /** + * Attributes of a visitation which determine whether it is up-to-date or not. + */ + private class VisitationAttributes { + private Collection<Target> targetsToVisit; + private boolean success = false; + private boolean visitSubincludes = true; + private int maxDepth = 0; + + /** + * Returns true if and only if this visitation attribute is still up-to-date. + */ + boolean current() { + return targetsToVisit.equals(lastVisitation.targetsToVisit) + && maxDepth <= lastVisitation.maxDepth + && visitSubincludes == lastVisitation.visitSubincludes; + } + } + + /* + * Interrupts during the loading phase =================================== + * + * Bazel can be interrupted in the middle of the loading phase. The mechanics + * of this are far from trivial, so there is an explanation of how they are + * supposed to work. For a description how the same thing works in the + * execution phase, see ParallelBuilder.java . + * + * The sequence of events that happen when the user presses Ctrl-C is the + * following: + * + * 1. A SIGINT gets delivered to the Bazel client process. + * + * 2. The client process delivers the SIGINT to the server process. + * + * 3. The interruption state of the main thread is set to true. + * + * 4. Sooner or later, this results in an InterruptedException being thrown. + * Usually this takes place because the main thread is interrupted during + * AbstractQueueVisitor.awaitTermination(). The only exception to this is when + * the interruption occurs during the loading of a package of a label + * specified on the command line; in this case, the InterruptedException is + * thrown during the loading of an individual package (see below where this + * can occur) + * + * 5. The main thread calls ThreadPoolExecutor.shutdown(), which in turn + * interrupts every worker thread. Then the main thread waits for their + * termination. + * + * 6. An InterruptedException is thrown during the loading of an individual + * package in the worker threads. + * + * 7. All worker threads terminate. + * + * 8. An InterruptedException is thrown from + * AbstractQueueVisitor.awaitTermination() + * + * 9. This exception causes the execution of the currently running command to + * terminate prematurely. + * + * The interruption of the loading of an individual package can happen in two + * different ways depending on whether Python preprocessing is in effect or + * not. + * + * If there is no Python preprocessing: + * + * 1. We periodically check the interruption state of the thread in + * UnixGlob.reallyGlob(). If it is interrupted, an InterruptedException is + * thrown. + * + * 2. The stack is unwound until we are out of the part of the call stack + * responsible for package loading. This either means that the worker thread + * terminates or that the label parsing terminates if the package that is + * being loaded was specified on the command line. + * + * If there is Python preprocessing, events are a bit more complicated. In + * this case, the real work happens on the thread the Python preprocessor is + * called from, but in a bit more convoluted way: a new thread is spawned by + * to handle the input from the Python process and + * the output to the Python process is handled on the main thread. The reading + * thread parses requests from the preprocessor, and passes them using a queue + * to the writing thread (that is, the main thread), so that we can do the + * work there. This is important because this way, we don't have any work that + * we need to interrupt in a thread that is not spawned by us. So: + * + * 1. The interrupted state of the main thread is set. + * + * 2. This results in an InterruptedException during the execution of the task + * in PythonStdinInputStream.getNextMessage(). + * + * 3. We exit from RequestParser.Request.run() prematurely, set a flag to + * signal that we were interrupted, and throw an InterruptedIOException. + * + * 4. The Python child process and reading thread are terminated. + * + * 5. Based on the flag we set in step 3, we realize that the termination was + * due to an interruption, and an InterruptedException is thrown. This can + * either raise an AbnormalTerminationException, or make Command.execute() + * return normally, so we check for both cases. + * + * 6. This InterruptedException causes the loading of the package to terminate + * prematurely. + * + * Life is not simple. + */ + private final PackageProvider packageProvider; + private final BinaryPredicate<Rule, Attribute> edgeFilter; + private final SetMultimap<Package, Target> visitedMap = + Multimaps.synchronizedSetMultimap(HashMultimap.<Package, Target>create()); + private final ConcurrentMap<Label, Integer> visitedTargets = new MapMaker().makeMap(); + + private VisitationAttributes lastVisitation; + + /** + * Constant for limiting the permitted depth of recursion. + */ + private static final int RECURSION_LIMIT = 100; + + /** + * Construct a LabelVisitor. + * + * @param packageProvider how to resolve labels to targets. + * @param edgeFilter which edges may be traversed. + */ + public LabelVisitor(PackageProvider packageProvider, + BinaryPredicate<Rule, Attribute> edgeFilter) { + this.packageProvider = packageProvider; + this.lastVisitation = new VisitationAttributes(); + this.edgeFilter = edgeFilter; + } + + boolean syncWithVisitor(EventHandler eventHandler, Collection<Target> targetsToVisit, + boolean keepGoing, int parallelThreads, int maxDepth, TargetEdgeObserver... observers) + throws InterruptedException { + VisitationAttributes nextVisitation = new VisitationAttributes(); + nextVisitation.targetsToVisit = targetsToVisit; + nextVisitation.maxDepth = maxDepth; + + if (!lastVisitation.success || !nextVisitation.current()) { + try { + nextVisitation.success = redoVisitation(eventHandler, nextVisitation, keepGoing, + parallelThreads, maxDepth, observers); + return nextVisitation.success; + } finally { + lastVisitation = nextVisitation; + } + } else { + return true; + } + } + + // Does a bounded transitive visitation starting at the given top-level targets. + private boolean redoVisitation(EventHandler eventHandler, + VisitationAttributes visitation, + boolean keepGoing, + int parallelThreads, + int maxDepth, + TargetEdgeObserver... observers) + throws InterruptedException { + visitedMap.clear(); + visitedTargets.clear(); + + Visitor visitor = new Visitor(eventHandler, keepGoing, parallelThreads, maxDepth, observers); + + Throwable uncaught = null; + boolean result; + try { + visitor.visitTargets(visitation.targetsToVisit); + } catch (Throwable t) { + visitor.stopNewActions(); + uncaught = t; + } finally { + // Run finish() in finally block to ensure we don't leak threads on exceptions. + result = visitor.finish(); + } + Throwables.propagateIfPossible(uncaught); + return result; + } + + boolean hasVisited(Label target) { + return visitedTargets.containsKey(target); + } + + @VisibleForTesting class Visitor extends AbstractQueueVisitor { + + private final static String THREAD_NAME = "LabelVisitor"; + + private final EventHandler eventHandler; + private final boolean keepGoing; + private final int maxDepth; + private final Iterable<TargetEdgeObserver> observers; + private final TargetEdgeErrorObserver errorObserver; + private final AtomicBoolean stopNewActions = new AtomicBoolean(false); + private static final boolean CONCURRENT = true; + + + public Visitor(EventHandler eventHandler, boolean keepGoing, int parallelThreads, + int maxDepth, TargetEdgeObserver... observers) { + // Observing the loading phase of a typical large package (with all subpackages) shows + // maximum thread-level concurrency of ~20. Limiting the total number of threads to 200 is + // therefore conservative and should help us avoid hitting native limits. + super(CONCURRENT, parallelThreads, parallelThreads, 1L, TimeUnit.SECONDS, !keepGoing, + THREAD_NAME); + this.eventHandler = eventHandler; + this.maxDepth = maxDepth; + this.errorObserver = new TargetEdgeErrorObserver(); + ImmutableList.Builder<TargetEdgeObserver> builder = ImmutableList.builder(); + for (TargetEdgeObserver observer : observers) { + builder.add(observer); + } + builder.add(errorObserver); + this.observers = builder.build(); + this.keepGoing = keepGoing; + } + + /** + * Visit the specified labels and follow the transitive closure of their + * outbound dependencies. + * + * @param targets the targets to visit + */ + @ThreadSafe + public void visitTargets(Iterable<Target> targets) { + for (Target target : targets) { + visit(null, null, target, 0, 0); + } + } + + @ThreadSafe + public boolean finish() throws InterruptedException { + work(true); + return !errorObserver.hasErrors(); + } + + @Override + protected boolean blockNewActions() { + return (!keepGoing && errorObserver.hasErrors()) || super.blockNewActions() || + stopNewActions.get(); + } + + public void stopNewActions() { + stopNewActions.set(true); + } + + private void enqueueTarget( + final Target from, final Attribute attr, final Label label, final int depth, + final int count) { + // Don't perform the targetProvider lookup if at the maximum depth already. + if (depth >= maxDepth) { + return; + } else if (attr != null && from instanceof Rule) { + if (!edgeFilter.apply((Rule) from, attr)) { + return; + } + } + + // Avoid thread-related overhead when not crossing packages. + // Can start a new thread when count reaches 100, to prevent infinite recursion. + if (from != null && from.getLabel().getPackageFragment() == label.getPackageFragment() && + !blockNewActions() && count < RECURSION_LIMIT) { + newVisitRunnable(from, attr, label, depth, count + 1).run(); + } else { + enqueue(newVisitRunnable(from, attr, label, depth, 0)); + } + } + + private Runnable newVisitRunnable(final Target from, final Attribute attr, final Label label, + final int depth, final int count) { + return new Runnable () { + @Override + public void run() { + try { + Target target = packageProvider.getTarget(eventHandler, label); + if (target == null) { + // Let target visitation continue so we can discover additional unknown inputs. + return; + } + visit(from, attr, packageProvider.getTarget(eventHandler, label), depth + 1, count); + } catch (NoSuchThingException e) { + observeError(from, label, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + }; + } + + private void visitTargetVisibility(Target target, int depth, int count) { + Attribute attribute = null; + if (target instanceof Rule) { + attribute = ((Rule) target).getRuleClassObject().getAttributeByName("visibility"); + } + + for (Label label : target.getVisibility().getDependencyLabels()) { + enqueueTarget(target, attribute, label, depth, count); + } + } + + /** + * Visit all the labels in a given rule. + * + * <p>Called in a worker thread if CONCURRENT. + * + * @param rule the rule to visit + */ + @ThreadSafe + private void visitRule(final Rule rule, final int depth, final int count) { + // Follow all labels defined by this rule: + AggregatingAttributeMapper.of(rule).visitLabels(new AttributeMap.AcceptsLabelAttribute() { + @Override + public void acceptLabelAttribute(Label label, Attribute attribute) { + enqueueTarget(rule, attribute, label, depth, count); + } + }); + } + + @ThreadSafe + private void visitPackageGroup(PackageGroup packageGroup, int depth, int count) { + for (final Label include : packageGroup.getIncludes()) { + enqueueTarget(packageGroup, null, include, depth, count); + } + } + + /** + * Visits the target and its package. + * + * <p>Potentially blocking invocations into the package cache are + * enqueued in the worker pool if CONCURRENT. + */ + private void visit( + Target from, Attribute attribute, final Target target, int depth, int count) { + if (depth > maxDepth) { + return; + } + + if (from != null) { + observeEdge(from, attribute, target); + } + + visitedMap.put(target.getPackage(), target); + visitTargetNode(target, depth, count); + } + + /** + * Visit the specified target. + * Called in a worker thread if CONCURRENT. + * + * @param target the target to visit + */ + private void visitTargetNode(Target target, int depth, int count) { + Integer minTargetDepth = visitedTargets.putIfAbsent(target.getLabel(), depth); + if (minTargetDepth != null) { + // The target was already visited at a greater depth. + // The closure we are about to build is therefore a subset of what + // has already been built, and we can skip it. + // Also special case MAX_VALUE, where we never want to revisit targets. + // (This avoids loading phase overhead outside of queries). + if (maxDepth == Integer.MAX_VALUE || minTargetDepth <= depth) { + return; + } + // Check again in case it was overwritten by another thread. + synchronized (visitedTargets) { + if (visitedTargets.get(target.getLabel()) <= depth) { + return; + } + visitedTargets.put(target.getLabel(), depth); + } + } + + observeNode(target); + if (target instanceof OutputFile) { + Rule rule = ((OutputFile) target).getGeneratingRule(); + observeEdge(target, null, rule); + // This is the only recursive call to visit which doesn't pass through enqueueTarget(). + visit(null, null, rule, depth + 1, count + 1); + visitTargetVisibility(target, depth, count); + } else if (target instanceof InputFile) { + visitTargetVisibility(target, depth, count); + } else if (target instanceof Rule) { + visitTargetVisibility(target, depth, count); + visitRule((Rule) target, depth, count); + } else if (target instanceof PackageGroup) { + visitPackageGroup((PackageGroup) target, depth, count); + } + } + + private void observeEdge(Target from, Attribute attribute, Target to) { + for (TargetEdgeObserver observer : observers) { + observer.edge(from, attribute, to); + } + } + + private void observeNode(Target target) { + for (TargetEdgeObserver observer : observers) { + observer.node(target); + } + } + + private void observeError(Target from, Label label, NoSuchThingException e) { + for (TargetEdgeObserver observer : observers) { + observer.missingEdge(from, label, e); + } + } + } +} |