aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java481
1 files changed, 481 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
new file mode 100644
index 0000000000..b72e3aac52
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/query2/LabelVisitor.java
@@ -0,0 +1,481 @@
+// Copyright 2014 Google Inc. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.query2;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.MapMaker;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
+import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+import com.google.devtools.build.lib.events.EventHandler;
+import com.google.devtools.build.lib.packages.AggregatingAttributeMapper;
+import com.google.devtools.build.lib.packages.Attribute;
+import com.google.devtools.build.lib.packages.AttributeMap;
+import com.google.devtools.build.lib.packages.InputFile;
+import com.google.devtools.build.lib.packages.NoSuchThingException;
+import com.google.devtools.build.lib.packages.OutputFile;
+import com.google.devtools.build.lib.packages.Package;
+import com.google.devtools.build.lib.packages.PackageGroup;
+import com.google.devtools.build.lib.packages.Rule;
+import com.google.devtools.build.lib.packages.Target;
+import com.google.devtools.build.lib.pkgcache.PackageProvider;
+import com.google.devtools.build.lib.pkgcache.TargetEdgeObserver;
+import com.google.devtools.build.lib.syntax.Label;
+import com.google.devtools.build.lib.util.BinaryPredicate;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * <p>Visit the transitive closure of a label. Primarily used to "fault in"
+ * packages to the packageProvider and ensure the necessary targets exists, in
+ * advance of the configuration step, which is intolerant of missing
+ * packages/targets.
+ *
+ * <p>LabelVisitor loads packages concurrently where possible, to increase I/O
+ * parallelism. However, the public interface is not thread-safe: calls to
+ * public methods should not be made concurrently.
+ *
+ * <p>LabelVisitor is stateful: It remembers the previous visitation and can
+ * check its validity on subsequent calls to sync() instead of doing the normal
+ * visitation.
+ *
+ * <p>TODO(bazel-team): (2009) a small further optimization could be achieved if we
+ * create tasks at the package (not individual label) level, since package
+ * loading is the expensive step. This would require additional bookkeeping to
+ * maintain the list of labels that we need to visit once a package becomes
+ * available. Profiling suggests that there is still a potential benefit to be
+ * gained: when the set of packages is known a-priori, loading a set of packages
+ * that took 20 seconds can be done under 5 in the sequential case or 7 in the
+ * current (parallel) case.
+ *
+ * <h4>Concurrency</h4>
+ *
+ * <p>The sync() methods of this class is thread-compatible. The accessor
+ * ({@link #hasVisited} and similar must not be called until the concurrent phase
+ * is over, i.e. all external calls to visit() methods have completed.
+ */
+final class LabelVisitor {
+
+ /**
+ * Attributes of a visitation which determine whether it is up-to-date or not.
+ */
+ private class VisitationAttributes {
+ private Collection<Target> targetsToVisit;
+ private boolean success = false;
+ private boolean visitSubincludes = true;
+ private int maxDepth = 0;
+
+ /**
+ * Returns true if and only if this visitation attribute is still up-to-date.
+ */
+ boolean current() {
+ return targetsToVisit.equals(lastVisitation.targetsToVisit)
+ && maxDepth <= lastVisitation.maxDepth
+ && visitSubincludes == lastVisitation.visitSubincludes;
+ }
+ }
+
+ /*
+ * Interrupts during the loading phase ===================================
+ *
+ * Bazel can be interrupted in the middle of the loading phase. The mechanics
+ * of this are far from trivial, so there is an explanation of how they are
+ * supposed to work. For a description how the same thing works in the
+ * execution phase, see ParallelBuilder.java .
+ *
+ * The sequence of events that happen when the user presses Ctrl-C is the
+ * following:
+ *
+ * 1. A SIGINT gets delivered to the Bazel client process.
+ *
+ * 2. The client process delivers the SIGINT to the server process.
+ *
+ * 3. The interruption state of the main thread is set to true.
+ *
+ * 4. Sooner or later, this results in an InterruptedException being thrown.
+ * Usually this takes place because the main thread is interrupted during
+ * AbstractQueueVisitor.awaitTermination(). The only exception to this is when
+ * the interruption occurs during the loading of a package of a label
+ * specified on the command line; in this case, the InterruptedException is
+ * thrown during the loading of an individual package (see below where this
+ * can occur)
+ *
+ * 5. The main thread calls ThreadPoolExecutor.shutdown(), which in turn
+ * interrupts every worker thread. Then the main thread waits for their
+ * termination.
+ *
+ * 6. An InterruptedException is thrown during the loading of an individual
+ * package in the worker threads.
+ *
+ * 7. All worker threads terminate.
+ *
+ * 8. An InterruptedException is thrown from
+ * AbstractQueueVisitor.awaitTermination()
+ *
+ * 9. This exception causes the execution of the currently running command to
+ * terminate prematurely.
+ *
+ * The interruption of the loading of an individual package can happen in two
+ * different ways depending on whether Python preprocessing is in effect or
+ * not.
+ *
+ * If there is no Python preprocessing:
+ *
+ * 1. We periodically check the interruption state of the thread in
+ * UnixGlob.reallyGlob(). If it is interrupted, an InterruptedException is
+ * thrown.
+ *
+ * 2. The stack is unwound until we are out of the part of the call stack
+ * responsible for package loading. This either means that the worker thread
+ * terminates or that the label parsing terminates if the package that is
+ * being loaded was specified on the command line.
+ *
+ * If there is Python preprocessing, events are a bit more complicated. In
+ * this case, the real work happens on the thread the Python preprocessor is
+ * called from, but in a bit more convoluted way: a new thread is spawned by
+ * to handle the input from the Python process and
+ * the output to the Python process is handled on the main thread. The reading
+ * thread parses requests from the preprocessor, and passes them using a queue
+ * to the writing thread (that is, the main thread), so that we can do the
+ * work there. This is important because this way, we don't have any work that
+ * we need to interrupt in a thread that is not spawned by us. So:
+ *
+ * 1. The interrupted state of the main thread is set.
+ *
+ * 2. This results in an InterruptedException during the execution of the task
+ * in PythonStdinInputStream.getNextMessage().
+ *
+ * 3. We exit from RequestParser.Request.run() prematurely, set a flag to
+ * signal that we were interrupted, and throw an InterruptedIOException.
+ *
+ * 4. The Python child process and reading thread are terminated.
+ *
+ * 5. Based on the flag we set in step 3, we realize that the termination was
+ * due to an interruption, and an InterruptedException is thrown. This can
+ * either raise an AbnormalTerminationException, or make Command.execute()
+ * return normally, so we check for both cases.
+ *
+ * 6. This InterruptedException causes the loading of the package to terminate
+ * prematurely.
+ *
+ * Life is not simple.
+ */
+ private final PackageProvider packageProvider;
+ private final BinaryPredicate<Rule, Attribute> edgeFilter;
+ private final SetMultimap<Package, Target> visitedMap =
+ Multimaps.synchronizedSetMultimap(HashMultimap.<Package, Target>create());
+ private final ConcurrentMap<Label, Integer> visitedTargets = new MapMaker().makeMap();
+
+ private VisitationAttributes lastVisitation;
+
+ /**
+ * Constant for limiting the permitted depth of recursion.
+ */
+ private static final int RECURSION_LIMIT = 100;
+
+ /**
+ * Construct a LabelVisitor.
+ *
+ * @param packageProvider how to resolve labels to targets.
+ * @param edgeFilter which edges may be traversed.
+ */
+ public LabelVisitor(PackageProvider packageProvider,
+ BinaryPredicate<Rule, Attribute> edgeFilter) {
+ this.packageProvider = packageProvider;
+ this.lastVisitation = new VisitationAttributes();
+ this.edgeFilter = edgeFilter;
+ }
+
+ boolean syncWithVisitor(EventHandler eventHandler, Collection<Target> targetsToVisit,
+ boolean keepGoing, int parallelThreads, int maxDepth, TargetEdgeObserver... observers)
+ throws InterruptedException {
+ VisitationAttributes nextVisitation = new VisitationAttributes();
+ nextVisitation.targetsToVisit = targetsToVisit;
+ nextVisitation.maxDepth = maxDepth;
+
+ if (!lastVisitation.success || !nextVisitation.current()) {
+ try {
+ nextVisitation.success = redoVisitation(eventHandler, nextVisitation, keepGoing,
+ parallelThreads, maxDepth, observers);
+ return nextVisitation.success;
+ } finally {
+ lastVisitation = nextVisitation;
+ }
+ } else {
+ return true;
+ }
+ }
+
+ // Does a bounded transitive visitation starting at the given top-level targets.
+ private boolean redoVisitation(EventHandler eventHandler,
+ VisitationAttributes visitation,
+ boolean keepGoing,
+ int parallelThreads,
+ int maxDepth,
+ TargetEdgeObserver... observers)
+ throws InterruptedException {
+ visitedMap.clear();
+ visitedTargets.clear();
+
+ Visitor visitor = new Visitor(eventHandler, keepGoing, parallelThreads, maxDepth, observers);
+
+ Throwable uncaught = null;
+ boolean result;
+ try {
+ visitor.visitTargets(visitation.targetsToVisit);
+ } catch (Throwable t) {
+ visitor.stopNewActions();
+ uncaught = t;
+ } finally {
+ // Run finish() in finally block to ensure we don't leak threads on exceptions.
+ result = visitor.finish();
+ }
+ Throwables.propagateIfPossible(uncaught);
+ return result;
+ }
+
+ boolean hasVisited(Label target) {
+ return visitedTargets.containsKey(target);
+ }
+
+ @VisibleForTesting class Visitor extends AbstractQueueVisitor {
+
+ private final static String THREAD_NAME = "LabelVisitor";
+
+ private final EventHandler eventHandler;
+ private final boolean keepGoing;
+ private final int maxDepth;
+ private final Iterable<TargetEdgeObserver> observers;
+ private final TargetEdgeErrorObserver errorObserver;
+ private final AtomicBoolean stopNewActions = new AtomicBoolean(false);
+ private static final boolean CONCURRENT = true;
+
+
+ public Visitor(EventHandler eventHandler, boolean keepGoing, int parallelThreads,
+ int maxDepth, TargetEdgeObserver... observers) {
+ // Observing the loading phase of a typical large package (with all subpackages) shows
+ // maximum thread-level concurrency of ~20. Limiting the total number of threads to 200 is
+ // therefore conservative and should help us avoid hitting native limits.
+ super(CONCURRENT, parallelThreads, parallelThreads, 1L, TimeUnit.SECONDS, !keepGoing,
+ THREAD_NAME);
+ this.eventHandler = eventHandler;
+ this.maxDepth = maxDepth;
+ this.errorObserver = new TargetEdgeErrorObserver();
+ ImmutableList.Builder<TargetEdgeObserver> builder = ImmutableList.builder();
+ for (TargetEdgeObserver observer : observers) {
+ builder.add(observer);
+ }
+ builder.add(errorObserver);
+ this.observers = builder.build();
+ this.keepGoing = keepGoing;
+ }
+
+ /**
+ * Visit the specified labels and follow the transitive closure of their
+ * outbound dependencies.
+ *
+ * @param targets the targets to visit
+ */
+ @ThreadSafe
+ public void visitTargets(Iterable<Target> targets) {
+ for (Target target : targets) {
+ visit(null, null, target, 0, 0);
+ }
+ }
+
+ @ThreadSafe
+ public boolean finish() throws InterruptedException {
+ work(true);
+ return !errorObserver.hasErrors();
+ }
+
+ @Override
+ protected boolean blockNewActions() {
+ return (!keepGoing && errorObserver.hasErrors()) || super.blockNewActions() ||
+ stopNewActions.get();
+ }
+
+ public void stopNewActions() {
+ stopNewActions.set(true);
+ }
+
+ private void enqueueTarget(
+ final Target from, final Attribute attr, final Label label, final int depth,
+ final int count) {
+ // Don't perform the targetProvider lookup if at the maximum depth already.
+ if (depth >= maxDepth) {
+ return;
+ } else if (attr != null && from instanceof Rule) {
+ if (!edgeFilter.apply((Rule) from, attr)) {
+ return;
+ }
+ }
+
+ // Avoid thread-related overhead when not crossing packages.
+ // Can start a new thread when count reaches 100, to prevent infinite recursion.
+ if (from != null && from.getLabel().getPackageFragment() == label.getPackageFragment() &&
+ !blockNewActions() && count < RECURSION_LIMIT) {
+ newVisitRunnable(from, attr, label, depth, count + 1).run();
+ } else {
+ enqueue(newVisitRunnable(from, attr, label, depth, 0));
+ }
+ }
+
+ private Runnable newVisitRunnable(final Target from, final Attribute attr, final Label label,
+ final int depth, final int count) {
+ return new Runnable () {
+ @Override
+ public void run() {
+ try {
+ Target target = packageProvider.getTarget(eventHandler, label);
+ if (target == null) {
+ // Let target visitation continue so we can discover additional unknown inputs.
+ return;
+ }
+ visit(from, attr, packageProvider.getTarget(eventHandler, label), depth + 1, count);
+ } catch (NoSuchThingException e) {
+ observeError(from, label, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
+ }
+
+ private void visitTargetVisibility(Target target, int depth, int count) {
+ Attribute attribute = null;
+ if (target instanceof Rule) {
+ attribute = ((Rule) target).getRuleClassObject().getAttributeByName("visibility");
+ }
+
+ for (Label label : target.getVisibility().getDependencyLabels()) {
+ enqueueTarget(target, attribute, label, depth, count);
+ }
+ }
+
+ /**
+ * Visit all the labels in a given rule.
+ *
+ * <p>Called in a worker thread if CONCURRENT.
+ *
+ * @param rule the rule to visit
+ */
+ @ThreadSafe
+ private void visitRule(final Rule rule, final int depth, final int count) {
+ // Follow all labels defined by this rule:
+ AggregatingAttributeMapper.of(rule).visitLabels(new AttributeMap.AcceptsLabelAttribute() {
+ @Override
+ public void acceptLabelAttribute(Label label, Attribute attribute) {
+ enqueueTarget(rule, attribute, label, depth, count);
+ }
+ });
+ }
+
+ @ThreadSafe
+ private void visitPackageGroup(PackageGroup packageGroup, int depth, int count) {
+ for (final Label include : packageGroup.getIncludes()) {
+ enqueueTarget(packageGroup, null, include, depth, count);
+ }
+ }
+
+ /**
+ * Visits the target and its package.
+ *
+ * <p>Potentially blocking invocations into the package cache are
+ * enqueued in the worker pool if CONCURRENT.
+ */
+ private void visit(
+ Target from, Attribute attribute, final Target target, int depth, int count) {
+ if (depth > maxDepth) {
+ return;
+ }
+
+ if (from != null) {
+ observeEdge(from, attribute, target);
+ }
+
+ visitedMap.put(target.getPackage(), target);
+ visitTargetNode(target, depth, count);
+ }
+
+ /**
+ * Visit the specified target.
+ * Called in a worker thread if CONCURRENT.
+ *
+ * @param target the target to visit
+ */
+ private void visitTargetNode(Target target, int depth, int count) {
+ Integer minTargetDepth = visitedTargets.putIfAbsent(target.getLabel(), depth);
+ if (minTargetDepth != null) {
+ // The target was already visited at a greater depth.
+ // The closure we are about to build is therefore a subset of what
+ // has already been built, and we can skip it.
+ // Also special case MAX_VALUE, where we never want to revisit targets.
+ // (This avoids loading phase overhead outside of queries).
+ if (maxDepth == Integer.MAX_VALUE || minTargetDepth <= depth) {
+ return;
+ }
+ // Check again in case it was overwritten by another thread.
+ synchronized (visitedTargets) {
+ if (visitedTargets.get(target.getLabel()) <= depth) {
+ return;
+ }
+ visitedTargets.put(target.getLabel(), depth);
+ }
+ }
+
+ observeNode(target);
+ if (target instanceof OutputFile) {
+ Rule rule = ((OutputFile) target).getGeneratingRule();
+ observeEdge(target, null, rule);
+ // This is the only recursive call to visit which doesn't pass through enqueueTarget().
+ visit(null, null, rule, depth + 1, count + 1);
+ visitTargetVisibility(target, depth, count);
+ } else if (target instanceof InputFile) {
+ visitTargetVisibility(target, depth, count);
+ } else if (target instanceof Rule) {
+ visitTargetVisibility(target, depth, count);
+ visitRule((Rule) target, depth, count);
+ } else if (target instanceof PackageGroup) {
+ visitPackageGroup((PackageGroup) target, depth, count);
+ }
+ }
+
+ private void observeEdge(Target from, Attribute attribute, Target to) {
+ for (TargetEdgeObserver observer : observers) {
+ observer.edge(from, attribute, to);
+ }
+ }
+
+ private void observeNode(Target target) {
+ for (TargetEdgeObserver observer : observers) {
+ observer.node(target);
+ }
+ }
+
+ private void observeError(Target from, Label label, NoSuchThingException e) {
+ for (TargetEdgeObserver observer : observers) {
+ observer.missingEdge(from, label, e);
+ }
+ }
+ }
+}