aboutsummaryrefslogtreecommitdiffhomepage
path: root/src
diff options
context:
space:
mode:
authorGravatar Googler <noreply@google.com>2017-05-16 05:25:49 +0200
committerGravatar Dmitry Lomov <dslomov@google.com>2017-05-16 15:24:46 +0200
commit7184b6f55a8cb72094a481d056fe89bc7be80c76 (patch)
tree1378cc258d8874356baac702c2b5960d90178588 /src
parent23004957a56d304106d5dbf0d6ea52ec10713b39 (diff)
Have TransitiveTraversalValues store kind of targets which have errors when computing TransitiveTraversalValues.
RELNOTES: None PiperOrigin-RevId: 156138657
Diffstat (limited to 'src')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java314
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java326
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java2
-rw-r--r--src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalFunction.java12
-rw-r--r--src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalValue.java56
5 files changed, 382 insertions, 328 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index 07249b50fb..28cdede50e 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -24,16 +24,10 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.collect.CompactHashSet;
-import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
-import com.google.devtools.build.lib.concurrent.BlockingStack;
-import com.google.devtools.build.lib.concurrent.ErrorClassifier;
import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
-import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
-import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.packages.Target;
import com.google.devtools.build.lib.query2.engine.Callback;
import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryTaskFuture;
@@ -51,10 +45,6 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
/**
* Parallel implementations of various functionality in {@link SkyQueryEnvironment}.
@@ -86,7 +76,7 @@ class ParallelSkyQueryUtils {
return env.eval(
expression,
context,
- new SkyKeyBFSVisitorCallback(
+ ParallelVisitor.createParallelVisitorCallback(
new AllRdepsUnboundedVisitor.Factory(env, callback, packageSemaphore)));
}
@@ -104,7 +94,8 @@ class ParallelSkyQueryUtils {
}
/** A helper class that computes 'rbuildfiles(<blah>)' via BFS. */
- private static class RBuildFilesVisitor extends AbstractSkyKeyBFSVisitor<SkyKey> {
+ private static class RBuildFilesVisitor extends ParallelVisitor<SkyKey> {
+ private final SkyQueryEnvironment env;
private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
private RBuildFilesVisitor(
@@ -112,7 +103,8 @@ class ParallelSkyQueryUtils {
Uniquifier<SkyKey> uniquifier,
Callback<Target> callback,
MultisetSemaphore<PackageIdentifier> packageSemaphore) {
- super(env, uniquifier, callback);
+ super(uniquifier, callback, VISIT_BATCH_SIZE);
+ this.env = env;
this.packageSemaphore = packageSemaphore;
}
@@ -173,8 +165,8 @@ class ParallelSkyQueryUtils {
* even with 10M edges, the memory overhead is around 160M, and the memory can be reclaimed by
* regular GC.
*/
- private static class AllRdepsUnboundedVisitor
- extends AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> {
+ private static class AllRdepsUnboundedVisitor extends ParallelVisitor<Pair<SkyKey, SkyKey>> {
+ private final SkyQueryEnvironment env;
private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
private AllRdepsUnboundedVisitor(
@@ -182,7 +174,8 @@ class ParallelSkyQueryUtils {
Uniquifier<Pair<SkyKey, SkyKey>> uniquifier,
Callback<Target> callback,
MultisetSemaphore<PackageIdentifier> packageSemaphore) {
- super(env, uniquifier, callback);
+ super(uniquifier, callback, VISIT_BATCH_SIZE);
+ this.env = env;
this.packageSemaphore = packageSemaphore;
}
@@ -192,7 +185,7 @@ class ParallelSkyQueryUtils {
* single {@link Callback#process} call. Note that all the created instances share the same
* {@link Uniquifier} so that we don't visit the same Skyframe node more than once.
*/
- private static class Factory implements AbstractSkyKeyBFSVisitor.Factory {
+ private static class Factory implements ParallelVisitor.Factory {
private final SkyQueryEnvironment env;
private final Uniquifier<Pair<SkyKey, SkyKey>> uniquifier;
private final Callback<Target> callback;
@@ -209,7 +202,7 @@ class ParallelSkyQueryUtils {
}
@Override
- public AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> create() {
+ public ParallelVisitor<Pair<SkyKey, SkyKey>> create() {
return new AllRdepsUnboundedVisitor(env, uniquifier, callback, packageSemaphore);
}
}
@@ -337,290 +330,5 @@ class ParallelSkyQueryUtils {
return builder.build();
}
}
-
- /**
- * A {@link Callback} whose {@link Callback#process} method kicks off a BFS visitation via a fresh
- * {@link AbstractSkyKeyBFSVisitor} instance.
- */
- private static class SkyKeyBFSVisitorCallback implements Callback<Target> {
- private final AbstractSkyKeyBFSVisitor.Factory visitorFactory;
-
- private SkyKeyBFSVisitorCallback(AbstractSkyKeyBFSVisitor.Factory visitorFactory) {
- this.visitorFactory = visitorFactory;
- }
-
- @Override
- public void process(Iterable<Target> partialResult)
- throws QueryException, InterruptedException {
- AbstractSkyKeyBFSVisitor<?> visitor = visitorFactory.create();
- // TODO(nharmata): It's not ideal to have an operation like this in #process that blocks on
- // another, potentially expensive computation. Refactor to something like "processAsync".
- visitor.visitAndWaitForCompletion(
- SkyQueryEnvironment.makeTransitiveTraversalKeysStrict(partialResult));
- }
- }
-
- /**
- * A helper class for performing a custom BFS visitation on the Skyframe graph, using {@link
- * QuiescingExecutor}.
- *
- * <p>The visitor uses an AbstractQueueVisitor backed by a ThreadPoolExecutor with a thread pool
- * NOT part of the global query evaluation pool to avoid starvation.
- */
- @ThreadSafe
- private abstract static class AbstractSkyKeyBFSVisitor<T> {
- protected final SkyQueryEnvironment env;
- private final Uniquifier<T> uniquifier;
- private final Callback<Target> callback;
-
- private final BFSVisitingTaskExecutor executor;
-
- /** A queue to store pending visits. */
- private final LinkedBlockingQueue<T> processingQueue = new LinkedBlockingQueue<>();
-
- /**
- * The max time interval between two scheduling passes in milliseconds. A scheduling pass is
- * defined as the scheduler thread determining whether to drain all pending visits from the
- * queue and submitting tasks to perform the visits.
- *
- * <p>The choice of 1ms is a result based of experiments. It is an attempted balance due to a
- * few facts about the scheduling interval:
- *
- * <p>1. A large interval adds systematic delay. In an extreme case, a BFS visit which is
- * supposed to take only 1ms now may take 5ms. For most BFS visits which take longer than a few
- * hundred milliseconds, it should not be noticeable.
- *
- * <p>2. A zero-interval config eats too much CPU.
- *
- * <p>Even though the scheduler runs once every 1 ms, it does not try to drain it every time.
- * Pending visits are drained only certain criteria are met.
- */
- private static final long SCHEDULING_INTERVAL_MILLISECONDS = 1;
-
- /**
- * The minimum number of pending tasks the scheduler tries to hit. The 3x number is set based on
- * experiments. We do not want to schedule tasks too frequently to miss the benefits of large
- * number of keys being grouped by packages. On the other hand, we want to keep all threads in
- * the pool busy to achieve full capacity. A low number here will cause some of the worker
- * threads to go idle at times before the next scheduling cycle.
- *
- * <p>TODO(shazh): Revisit the choice of task target based on real-prod performance.
- */
- private static final long MIN_PENDING_TASKS = 3 * SkyQueryEnvironment.DEFAULT_THREAD_COUNT;
-
- /**
- * Fail fast on RuntimeExceptions, including {code RuntimeInterruptedException} and {@code
- * RuntimeQueryException}, which are resulted from InterruptedException and QueryException.
- */
- static final ErrorClassifier SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER =
- new ErrorClassifier() {
- @Override
- protected ErrorClassification classifyException(Exception e) {
- return (e instanceof RuntimeException)
- ? ErrorClassification.CRITICAL_AND_LOG
- : ErrorClassification.NOT_CRITICAL;
- }
- };
-
- /** All BFS visitors share a single global fixed thread pool. */
- private static final ExecutorService FIXED_THREAD_POOL_EXECUTOR =
- new ThreadPoolExecutor(
- // Must be at least 2 worker threads in the pool (1 for the scheduler thread).
- /*corePoolSize=*/ Math.max(2, SkyQueryEnvironment.DEFAULT_THREAD_COUNT),
- /*maximumPoolSize=*/ Math.max(2, SkyQueryEnvironment.DEFAULT_THREAD_COUNT),
- /*keepAliveTime=*/ 1,
- /*units=*/ TimeUnit.SECONDS,
- /*workQueue=*/ new BlockingStack<Runnable>(),
- new ThreadFactoryBuilder().setNameFormat("skykey-bfs-visitor %d").build());
-
- private AbstractSkyKeyBFSVisitor(
- SkyQueryEnvironment env, Uniquifier<T> uniquifier, Callback<Target> callback) {
- this.env = env;
- this.uniquifier = uniquifier;
- this.callback = callback;
- this.executor =
- new BFSVisitingTaskExecutor(
- FIXED_THREAD_POOL_EXECUTOR, SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER);
- }
-
- /** Factory for {@link AbstractSkyKeyBFSVisitor} instances. */
- private static interface Factory {
- AbstractSkyKeyBFSVisitor<?> create();
- }
-
- protected final class Visit {
- private final Iterable<SkyKey> keysToUseForResult;
- private final Iterable<T> keysToVisit;
-
- private Visit(Iterable<SkyKey> keysToUseForResult, Iterable<T> keysToVisit) {
- this.keysToUseForResult = keysToUseForResult;
- this.keysToVisit = keysToVisit;
- }
- }
-
- void visitAndWaitForCompletion(Iterable<SkyKey> keys)
- throws QueryException, InterruptedException {
- processingQueue.addAll(ImmutableList.copyOf(preprocessInitialVisit(keys)));
- executor.bfsVisitAndWaitForCompletion();
- }
-
- /**
- * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s in
- * the full visitation to the given {@link Callback}.
- */
- protected abstract void processResultantTargets(
- Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
- throws QueryException, InterruptedException;
-
- /** Gets the {@link Visit} representing the local visitation of the given {@code values}. */
- protected abstract Visit getVisitResult(Iterable<T> values) throws InterruptedException;
-
- /** Gets the first {@link Visit} representing the entry-level SkyKeys. */
- protected abstract Iterable<T> preprocessInitialVisit(Iterable<SkyKey> keys);
-
- protected Iterable<Task> getVisitTasks(Collection<T> pendingKeysToVisit) {
- ImmutableList.Builder<Task> builder = ImmutableList.builder();
- for (Iterable<T> keysToVisitBatch :
- Iterables.partition(pendingKeysToVisit, VISIT_BATCH_SIZE)) {
- builder.add(new VisitTask(keysToVisitBatch));
- }
-
- return builder.build();
- }
-
- abstract static class Task implements Runnable {
-
- @Override
- public void run() {
- try {
- process();
- } catch (QueryException e) {
- throw new RuntimeQueryException(e);
- } catch (InterruptedException e) {
- throw new RuntimeInterruptedException(e);
- }
- }
-
- abstract void process() throws QueryException, InterruptedException;
- }
-
- class VisitTask extends Task {
- private final Iterable<T> keysToVisit;
-
- private VisitTask(Iterable<T> keysToVisit) {
- this.keysToVisit = keysToVisit;
- }
-
- @Override
- void process() throws InterruptedException {
- ImmutableList<T> uniqueKeys = uniquifier.unique(keysToVisit);
- if (uniqueKeys.isEmpty()) {
- return;
- }
-
- Visit visit = getVisitResult(uniqueKeys);
- for (Iterable<SkyKey> keysToUseForResultBatch :
- Iterables.partition(
- visit.keysToUseForResult, SkyQueryEnvironment.BATCH_CALLBACK_SIZE)) {
- executor.execute(new GetAndProcessResultsTask(keysToUseForResultBatch));
- }
-
- processingQueue.addAll(ImmutableList.copyOf(visit.keysToVisit));
- }
- }
-
- private class GetAndProcessResultsTask extends Task {
- private final Iterable<SkyKey> keysToUseForResult;
-
- private GetAndProcessResultsTask(Iterable<SkyKey> keysToUseForResult) {
- this.keysToUseForResult = keysToUseForResult;
- }
-
- @Override
- protected void process() throws QueryException, InterruptedException {
- processResultantTargets(keysToUseForResult, callback);
- }
- }
-
- /**
- * A custom implementation of {@link QuiescingExecutor} which uses a centralized queue and
- * scheduler for parallel BFS visitations.
- */
- private class BFSVisitingTaskExecutor extends AbstractQueueVisitor {
- private BFSVisitingTaskExecutor(ExecutorService executor, ErrorClassifier errorClassifier) {
- super(
- /*executorService=*/ executor,
- // Leave the thread pool active for other current and future callers.
- /*shutdownOnCompletion=*/ false,
- /*failFastOnException=*/ true,
- /*errorClassifier=*/ errorClassifier);
- }
-
- private void bfsVisitAndWaitForCompletion() throws QueryException, InterruptedException {
- // The scheduler keeps running until either of the following two conditions are met.
- //
- // 1. Errors (QueryException or InterruptedException) occurred and visitations should fail
- // fast.
- // 2. There is no pending visit in the queue and no pending task running.
- while (!mustJobsBeStopped() && (!processingQueue.isEmpty() || getTaskCount() > 0)) {
- // To achieve maximum efficiency, queue is drained in either of the following two
- // conditions:
- //
- // 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU.
- // 2. The process queue size is large.
- if (getTaskCount() < MIN_PENDING_TASKS
- || processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) {
-
- Collection<T> pendingKeysToVisit = new ArrayList<>(processingQueue.size());
- processingQueue.drainTo(pendingKeysToVisit);
- for (Task task : getVisitTasks(pendingKeysToVisit)) {
- execute(task);
- }
- }
-
- try {
- Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS);
- } catch (InterruptedException e) {
- // If the main thread waiting for completion of the visitation is interrupted, we should
- // gracefully terminate all running and pending tasks before exit. If QueryException
- // occured in any of the worker thread, awaitTerminationAndPropagateErrorsIfAny
- // propagates the QueryException instead of InterruptedException.
- setInterrupted();
- awaitTerminationAndPropagateErrorsIfAny();
- throw e;
- }
- }
-
- // We reach here either because the visitation is complete, or because an error prevents us
- // from proceeding with the visitation. awaitTerminationAndPropagateErrorsIfAny will either
- // gracefully exit if the visitation is complete, or propagate the exception if error
- // occurred.
- awaitTerminationAndPropagateErrorsIfAny();
- }
-
- private void awaitTerminationAndPropagateErrorsIfAny()
- throws QueryException, InterruptedException {
- try {
- awaitTermination(/*interruptWorkers=*/ true);
- } catch (RuntimeQueryException e) {
- throw (QueryException) e.getCause();
- } catch (RuntimeInterruptedException e) {
- throw (InterruptedException) e.getCause();
- }
- }
- }
- }
-
- private static class RuntimeQueryException extends RuntimeException {
- private RuntimeQueryException(QueryException queryException) {
- super(queryException);
- }
- }
-
- private static class RuntimeInterruptedException extends RuntimeException {
- private RuntimeInterruptedException(InterruptedException interruptedException) {
- super(interruptedException);
- }
- }
}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java
new file mode 100644
index 0000000000..73e291e118
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelVisitor.java
@@ -0,0 +1,326 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.query2;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
+import com.google.devtools.build.lib.concurrent.BlockingStack;
+import com.google.devtools.build.lib.concurrent.ErrorClassifier;
+import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
+import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+import com.google.devtools.build.lib.packages.Target;
+import com.google.devtools.build.lib.query2.engine.Callback;
+import com.google.devtools.build.lib.query2.engine.QueryException;
+import com.google.devtools.build.lib.query2.engine.Uniquifier;
+import com.google.devtools.build.skyframe.SkyKey;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A helper class for performing a custom visitation on the Skyframe graph, using {@link
+ * QuiescingExecutor}.
+ *
+ * <p>The visitor uses an AbstractQueueVisitor backed by a ThreadPoolExecutor with a thread pool NOT
+ * part of the global query evaluation pool to avoid starvation.
+ */
+@ThreadSafe
+public abstract class ParallelVisitor<T> {
+ private final Uniquifier<T> uniquifier;
+ private final Callback<Target> callback;
+ private final int visitBatchSize;
+
+ private final VisitingTaskExecutor executor;
+
+ /** A queue to store pending visits. */
+ private final LinkedBlockingQueue<T> processingQueue = new LinkedBlockingQueue<>();
+
+ /**
+ * The max time interval between two scheduling passes in milliseconds. A scheduling pass is
+ * defined as the scheduler thread determining whether to drain all pending visits from the queue
+ * and submitting tasks to perform the visits.
+ *
+ * <p>The choice of 1ms is a result based of experiments. It is an attempted balance due to a few
+ * facts about the scheduling interval:
+ *
+ * <p>1. A large interval adds systematic delay. In an extreme case, a visit which is supposed to
+ * take only 1ms now may take 5ms. For most visits which take longer than a few hundred
+ * milliseconds, it should not be noticeable.
+ *
+ * <p>2. A zero-interval config eats too much CPU.
+ *
+ * <p>Even though the scheduler runs once every 1 ms, it does not try to drain it every time.
+ * Pending visits are drained only certain criteria are met.
+ */
+ private static final long SCHEDULING_INTERVAL_MILLISECONDS = 1;
+
+ /**
+ * The minimum number of pending tasks the scheduler tries to hit. The 3x number is set based on
+ * experiments. We do not want to schedule tasks too frequently to miss the benefits of large
+ * number of keys being grouped by packages. On the other hand, we want to keep all threads in the
+ * pool busy to achieve full capacity. A low number here will cause some of the worker threads to
+ * go idle at times before the next scheduling cycle.
+ *
+ * <p>TODO(shazh): Revisit the choice of task target based on real-prod performance.
+ */
+ private static final long MIN_PENDING_TASKS = 3 * SkyQueryEnvironment.DEFAULT_THREAD_COUNT;
+
+ /**
+ * Fail fast on RuntimeExceptions, including {code RuntimeInterruptedException} and {@code
+ * RuntimeQueryException}, which are resulted from InterruptedException and QueryException.
+ */
+ static final ErrorClassifier PARALLEL_VISITOR_ERROR_CLASSIFIER =
+ new ErrorClassifier() {
+ @Override
+ protected ErrorClassification classifyException(Exception e) {
+ return (e instanceof RuntimeException)
+ ? ErrorClassification.CRITICAL_AND_LOG
+ : ErrorClassification.NOT_CRITICAL;
+ }
+ };
+
+ /** All visitors share a single global fixed thread pool. */
+ private static final ExecutorService FIXED_THREAD_POOL_EXECUTOR =
+ new ThreadPoolExecutor(
+ /*corePoolSize=*/ Math.max(1, SkyQueryEnvironment.DEFAULT_THREAD_COUNT),
+ /*maximumPoolSize=*/ Math.max(1, SkyQueryEnvironment.DEFAULT_THREAD_COUNT),
+ /*keepAliveTime=*/ 1,
+ /*units=*/ TimeUnit.SECONDS,
+ /*workQueue=*/ new BlockingStack<Runnable>(),
+ new ThreadFactoryBuilder().setNameFormat("parallel-visitor %d").build());
+
+ protected ParallelVisitor(
+ Uniquifier<T> uniquifier, Callback<Target> callback, int visitBatchSize) {
+ this.uniquifier = uniquifier;
+ this.callback = callback;
+ this.visitBatchSize = visitBatchSize;
+ this.executor =
+ new VisitingTaskExecutor(FIXED_THREAD_POOL_EXECUTOR, PARALLEL_VISITOR_ERROR_CLASSIFIER);
+ }
+
+ /** Factory for {@link ParallelVisitor} instances. */
+ public static interface Factory {
+ ParallelVisitor<?> create();
+ }
+
+ /**
+ * Returns a {@link Callback} which kicks off a parallel visitation when {@link Callback#process}
+ * is invoked.
+ */
+ public static Callback<Target> createParallelVisitorCallback(Factory visitorFactory) {
+ return new ParallelVisitorCallback(visitorFactory);
+ }
+
+ /** An object to hold keys to visit and keys ready for processing. */
+ protected final class Visit {
+ private final Iterable<SkyKey> keysToUseForResult;
+ private final Iterable<T> keysToVisit;
+
+ public Visit(Iterable<SkyKey> keysToUseForResult, Iterable<T> keysToVisit) {
+ this.keysToUseForResult = keysToUseForResult;
+ this.keysToVisit = keysToVisit;
+ }
+ }
+
+ void visitAndWaitForCompletion(Iterable<SkyKey> keys)
+ throws QueryException, InterruptedException {
+ processingQueue.addAll(ImmutableList.copyOf(preprocessInitialVisit(keys)));
+ executor.visitAndWaitForCompletion();
+ }
+
+ /**
+ * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s in
+ * the full visitation to the given {@link Callback}.
+ */
+ protected abstract void processResultantTargets(
+ Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
+ throws QueryException, InterruptedException;
+
+ /** Gets the {@link Visit} representing the local visitation of the given {@code values}. */
+ protected abstract Visit getVisitResult(Iterable<T> values) throws InterruptedException;
+
+ /** Gets the first {@link Visit} representing the entry-level SkyKeys. */
+ protected abstract Iterable<T> preprocessInitialVisit(Iterable<SkyKey> keys);
+
+ /** Gets tasks to visit pending keys. */
+ protected Iterable<Task> getVisitTasks(Collection<T> pendingKeysToVisit) {
+ ImmutableList.Builder<Task> builder = ImmutableList.builder();
+ for (Iterable<T> keysToVisitBatch : Iterables.partition(pendingKeysToVisit, visitBatchSize)) {
+ builder.add(new VisitTask(keysToVisitBatch));
+ }
+
+ return builder.build();
+ }
+
+ /** A {@link Runnable} which handles {@link QueryException} and {@link InterruptedException}. */
+ protected abstract static class Task implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ process();
+ } catch (QueryException e) {
+ throw new RuntimeQueryException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeInterruptedException(e);
+ }
+ }
+
+ abstract void process() throws QueryException, InterruptedException;
+ }
+
+ class VisitTask extends Task {
+ private final Iterable<T> keysToVisit;
+
+ VisitTask(Iterable<T> keysToVisit) {
+ this.keysToVisit = keysToVisit;
+ }
+
+ @Override
+ void process() throws InterruptedException {
+ ImmutableList<T> uniqueKeys = uniquifier.unique(keysToVisit);
+ if (uniqueKeys.isEmpty()) {
+ return;
+ }
+
+ Visit visit = getVisitResult(uniqueKeys);
+ for (Iterable<SkyKey> keysToUseForResultBatch :
+ Iterables.partition(visit.keysToUseForResult, SkyQueryEnvironment.BATCH_CALLBACK_SIZE)) {
+ executor.execute(new GetAndProcessResultsTask(keysToUseForResultBatch));
+ }
+
+ processingQueue.addAll(ImmutableList.copyOf(visit.keysToVisit));
+ }
+ }
+
+ private class GetAndProcessResultsTask extends Task {
+ private final Iterable<SkyKey> keysToUseForResult;
+
+ private GetAndProcessResultsTask(Iterable<SkyKey> keysToUseForResult) {
+ this.keysToUseForResult = keysToUseForResult;
+ }
+
+ @Override
+ protected void process() throws QueryException, InterruptedException {
+ processResultantTargets(keysToUseForResult, callback);
+ }
+ }
+
+ /**
+ * A custom implementation of {@link QuiescingExecutor} which uses a centralized queue and
+ * scheduler for parallel visitations.
+ */
+ private class VisitingTaskExecutor extends AbstractQueueVisitor {
+ private VisitingTaskExecutor(ExecutorService executor, ErrorClassifier errorClassifier) {
+ super(
+ /*executorService=*/ executor,
+ // Leave the thread pool active for other current and future callers.
+ /*shutdownOnCompletion=*/ false,
+ /*failFastOnException=*/ true,
+ /*errorClassifier=*/ errorClassifier);
+ }
+
+ private void visitAndWaitForCompletion() throws QueryException, InterruptedException {
+ // The scheduler keeps running until either of the following two conditions are met.
+ //
+ // 1. Errors (QueryException or InterruptedException) occurred and visitations should fail
+ // fast.
+ // 2. There is no pending visit in the queue and no pending task running.
+ while (!mustJobsBeStopped() && (!processingQueue.isEmpty() || getTaskCount() > 0)) {
+ // To achieve maximum efficiency, queue is drained in either of the following two
+ // conditions:
+ //
+ // 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU.
+ // 2. The process queue size is large.
+ if (getTaskCount() < MIN_PENDING_TASKS
+ || processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) {
+
+ Collection<T> pendingKeysToVisit = new ArrayList<>(processingQueue.size());
+ processingQueue.drainTo(pendingKeysToVisit);
+ for (Task task : getVisitTasks(pendingKeysToVisit)) {
+ execute(task);
+ }
+ }
+
+ try {
+ Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS);
+ } catch (InterruptedException e) {
+ // If the main thread waiting for completion of the visitation is interrupted, we should
+ // gracefully terminate all running and pending tasks before exit. If QueryException
+ // occured in any of the worker thread, awaitTerminationAndPropagateErrorsIfAny
+ // propagates the QueryException instead of InterruptedException.
+ setInterrupted();
+ awaitTerminationAndPropagateErrorsIfAny();
+ throw e;
+ }
+ }
+
+ // We reach here either because the visitation is complete, or because an error prevents us
+ // from proceeding with the visitation. awaitTerminationAndPropagateErrorsIfAny will either
+ // gracefully exit if the visitation is complete, or propagate the exception if error
+ // occurred.
+ awaitTerminationAndPropagateErrorsIfAny();
+ }
+
+ private void awaitTerminationAndPropagateErrorsIfAny()
+ throws QueryException, InterruptedException {
+ try {
+ awaitTermination(/*interruptWorkers=*/ true);
+ } catch (RuntimeQueryException e) {
+ throw (QueryException) e.getCause();
+ } catch (RuntimeInterruptedException e) {
+ throw (InterruptedException) e.getCause();
+ }
+ }
+ }
+
+ /**
+ * A {@link Callback} whose {@link Callback#process} method kicks off a visitation via a fresh
+ * {@link ParallelVisitor} instance.
+ */
+ private static class ParallelVisitorCallback implements Callback<Target> {
+ private final ParallelVisitor.Factory visitorFactory;
+
+ private ParallelVisitorCallback(ParallelVisitor.Factory visitorFactory) {
+ this.visitorFactory = visitorFactory;
+ }
+
+ @Override
+ public void process(Iterable<Target> partialResult)
+ throws QueryException, InterruptedException {
+ ParallelVisitor<?> visitor = visitorFactory.create();
+ // TODO(nharmata): It's not ideal to have an operation like this in #process that blocks on
+ // another, potentially expensive computation. Refactor to something like "processAsync".
+ visitor.visitAndWaitForCompletion(
+ SkyQueryEnvironment.makeTransitiveTraversalKeysStrict(partialResult));
+ }
+ }
+
+ private static class RuntimeQueryException extends RuntimeException {
+ private RuntimeQueryException(QueryException queryException) {
+ super(queryException);
+ }
+ }
+
+ private static class RuntimeInterruptedException extends RuntimeException {
+ private RuntimeInterruptedException(InterruptedException interruptedException) {
+ super(interruptedException);
+ }
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 631afcb002..551844e962 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -669,7 +669,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
}
@ThreadSafe
- Uniquifier<SkyKey> createSkyKeyUniquifier() {
+ protected Uniquifier<SkyKey> createSkyKeyUniquifier() {
return new UniquifierImpl<>(SkyKeyKeyExtractor.INSTANCE, DEFAULT_THREAD_COUNT);
}
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalFunction.java b/src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalFunction.java
index 4f11095490..76730690b8 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalFunction.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalFunction.java
@@ -81,8 +81,13 @@ public class TransitiveTraversalFunction
}
}
- protected Collection<Label> getAspectLabels(Rule fromRule, Attribute attr, Label toLabel,
- ValueOrException2<NoSuchPackageException, NoSuchTargetException> toVal, Environment env) {
+ @Override
+ protected Collection<Label> getAspectLabels(
+ Rule fromRule,
+ Attribute attr,
+ Label toLabel,
+ ValueOrException2<NoSuchPackageException, NoSuchTargetException> toVal,
+ Environment env) {
try {
if (toVal == null) {
return ImmutableList.of();
@@ -110,7 +115,8 @@ public class TransitiveTraversalFunction
String firstErrorMessage = accumulator.getFirstErrorMessage();
return targetLoadedSuccessfully
? TransitiveTraversalValue.forTarget(targetAndErrorIfAny.getTarget(), firstErrorMessage)
- : TransitiveTraversalValue.unsuccessfulTransitiveTraversal(firstErrorMessage);
+ : TransitiveTraversalValue.unsuccessfulTransitiveTraversal(
+ firstErrorMessage, targetAndErrorIfAny.getTarget());
}
@Override
diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalValue.java b/src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalValue.java
index afc26d3568..4639868b76 100644
--- a/src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalValue.java
+++ b/src/main/java/com/google/devtools/build/lib/skyframe/TransitiveTraversalValue.java
@@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.skyframe;
+import com.google.common.base.MoreObjects;
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.concurrent.ThreadSafety.Immutable;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
@@ -52,8 +53,16 @@ public abstract class TransitiveTraversalValue implements SkyValue {
private static final InternerWithPresenceCheck<TransitiveTraversalValue> VALUE_INTERNER =
new InternerWithPresenceCheck<>();
- static TransitiveTraversalValue unsuccessfulTransitiveTraversal(String firstErrorMessage) {
- return new TransitiveTraversalValueWithError(Preconditions.checkNotNull(firstErrorMessage));
+ private final String kind;
+
+ protected TransitiveTraversalValue(String kind) {
+ this.kind = Preconditions.checkNotNull(kind);
+ }
+
+ static TransitiveTraversalValue unsuccessfulTransitiveTraversal(
+ String firstErrorMessage, Target target) {
+ return new TransitiveTraversalValueWithError(
+ Preconditions.checkNotNull(firstErrorMessage), target.getTargetKind());
}
static TransitiveTraversalValue forTarget(Target target, @Nullable String firstErrorMessage) {
@@ -84,16 +93,16 @@ public abstract class TransitiveTraversalValue implements SkyValue {
return value;
}
} else {
- return new TransitiveTraversalValueWithError(firstErrorMessage);
+ return new TransitiveTraversalValueWithError(firstErrorMessage, target.getTargetKind());
}
}
public static TransitiveTraversalValue create(
- AdvertisedProviderSet providers, @Nullable String kind, @Nullable String firstErrorMessage) {
+ AdvertisedProviderSet providers, String kind, @Nullable String firstErrorMessage) {
TransitiveTraversalValue value =
firstErrorMessage == null
? new TransitiveTraversalValueWithoutError(providers, kind)
- : new TransitiveTraversalValueWithError(firstErrorMessage);
+ : new TransitiveTraversalValueWithError(firstErrorMessage, kind);
if (firstErrorMessage == null) {
TransitiveTraversalValue oldValue = VALUE_INTERNER.getCanonical(value);
return oldValue == null ? value : oldValue;
@@ -110,9 +119,10 @@ public abstract class TransitiveTraversalValue implements SkyValue {
*/
public abstract AdvertisedProviderSet getProviders();
- /** Returns the target kind, if any. */
- @Nullable
- public abstract String getKind();
+ /** Returns the target kind. */
+ public String getKind() {
+ return kind;
+ }
/**
* Returns the first error message, if any, from loading the target and its transitive
@@ -149,12 +159,11 @@ public abstract class TransitiveTraversalValue implements SkyValue {
/** A transitive target reference without error. */
public static final class TransitiveTraversalValueWithoutError extends TransitiveTraversalValue {
private final AdvertisedProviderSet advertisedProviders;
- @Nullable private final String kind;
private TransitiveTraversalValueWithoutError(
AdvertisedProviderSet providers, @Nullable String kind) {
+ super(kind);
this.advertisedProviders = Preconditions.checkNotNull(providers);
- this.kind = kind;
}
@Override
@@ -169,14 +178,16 @@ public abstract class TransitiveTraversalValue implements SkyValue {
@Override
@Nullable
- public String getKind() {
- return kind;
+ public String getFirstErrorMessage() {
+ return null;
}
@Override
- @Nullable
- public String getFirstErrorMessage() {
- return null;
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("kind", getKind())
+ .add("providers", advertisedProviders)
+ .toString();
}
}
@@ -184,7 +195,8 @@ public abstract class TransitiveTraversalValue implements SkyValue {
public static final class TransitiveTraversalValueWithError extends TransitiveTraversalValue {
private final String firstErrorMessage;
- private TransitiveTraversalValueWithError(String firstErrorMessage) {
+ private TransitiveTraversalValueWithError(String firstErrorMessage, String kind) {
+ super(kind);
this.firstErrorMessage =
StringCanonicalizer.intern(Preconditions.checkNotNull(firstErrorMessage));
}
@@ -201,14 +213,16 @@ public abstract class TransitiveTraversalValue implements SkyValue {
@Override
@Nullable
- public String getKind() {
- return null;
+ public String getFirstErrorMessage() {
+ return firstErrorMessage;
}
@Override
- @Nullable
- public String getFirstErrorMessage() {
- return firstErrorMessage;
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("error", firstErrorMessage)
+ .add("kind", getKind())
+ .toString();
}
}
}