aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Googler <noreply@google.com>2016-11-28 21:54:43 +0000
committerGravatar Irina Iancu <elenairina@google.com>2016-11-29 08:07:08 +0000
commit2b5038831c2514f65841ce4e40f8e4648250bf01 (patch)
tree7f3cda740abdad84307803c3fe5243697f6f3479
parent47f48d2d500e5aa58fbbea8f163067653b0127f1 (diff)
Update ParallelSkyQueryUtils to use QuiescingExecutor instead of ForkJoinPool
for concurrent visitations. During BFS visitation of rdeps and rbuildfiles, it uses a centralized pool (backed by a LinkedBlockingQueue) to store all pending visits, and a periodically running scheduler to schedule tasks for each pending visit. -- MOS_MIGRATED_REVID=140398162
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java5
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/BlockingStack.java4
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java4
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java335
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java24
5 files changed, 242 insertions, 130 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index 01cdca1d3e..45d1c56c15 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -385,6 +385,11 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
}
+ @Override
+ public long getRemainingTasksCount() {
+ return remainingTasks.get();
+ }
+
/**
* Subclasses may override this to make dynamic decisions about whether to run tasks
* asynchronously versus in-thread.
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/BlockingStack.java b/src/main/java/com/google/devtools/build/lib/concurrent/BlockingStack.java
index f409bc2d73..93fc3c99f0 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/BlockingStack.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/BlockingStack.java
@@ -22,11 +22,11 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
/** A {@link BlockingQueue} with LIFO (last-in-first-out) ordering. */
-class BlockingStack<E> extends AbstractQueue<E> implements BlockingQueue<E> {
+public class BlockingStack<E> extends AbstractQueue<E> implements BlockingQueue<E> {
// We just restrict to only using the *First methods on the deque, turning it into a stack.
private final BlockingDeque<E> deque;
- BlockingStack() {
+ public BlockingStack() {
this.deque = new LinkedBlockingDeque<>();
}
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
index 65718c6205..78bc93dc10 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/QuiescingExecutor.java
@@ -14,7 +14,6 @@
package com.google.devtools.build.lib.concurrent;
import com.google.common.annotations.VisibleForTesting;
-
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
@@ -53,6 +52,9 @@ public interface QuiescingExecutor extends Executor {
*/
void awaitQuiescence(boolean interruptWorkers) throws InterruptedException;
+ /** Return the number of tasks which are not completed (running or waiting to be executed). */
+ long getRemainingTasksCount();
+
/** Get latch that is released if a task throws an exception. Used only in tests. */
@VisibleForTesting
CountDownLatch getExceptionLatchForTestingOnly();
diff --git a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
index 5f7447b198..086ecfc1b8 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/ParallelSkyQueryUtils.java
@@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.query2;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
@@ -22,11 +23,15 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.collect.CompactHashSet;
-import com.google.devtools.build.lib.concurrent.MoreFutures;
+import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
+import com.google.devtools.build.lib.concurrent.BlockingStack;
+import com.google.devtools.build.lib.concurrent.ErrorClassifier;
import com.google.devtools.build.lib.concurrent.MultisetSemaphore;
+import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.packages.Target;
import com.google.devtools.build.lib.query2.engine.Callback;
@@ -45,10 +50,10 @@ import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinTask;
-import java.util.concurrent.RecursiveAction;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* Parallel implementations of various functionality in {@link SkyQueryEnvironment}.
@@ -60,6 +65,10 @@ import java.util.concurrent.RecursiveAction;
*/
// TODO(bazel-team): Be more deliberate about bounding memory usage here.
class ParallelSkyQueryUtils {
+
+ /** The maximum number of keys to visit at once. */
+ @VisibleForTesting static final int VISIT_BATCH_SIZE = 10000;
+
private ParallelSkyQueryUtils() {
}
@@ -72,14 +81,13 @@ class ParallelSkyQueryUtils {
QueryExpression expression,
VariableContext<Target> context,
ThreadSafeCallback<Target> callback,
- ForkJoinPool forkJoinPool,
MultisetSemaphore<PackageIdentifier> packageSemaphore)
throws QueryException, InterruptedException {
env.eval(
expression,
context,
new SkyKeyBFSVisitorCallback(
- new AllRdepsUnboundedVisitor.Factory(env, callback, forkJoinPool, packageSemaphore)));
+ new AllRdepsUnboundedVisitor.Factory(env, callback, packageSemaphore)));
}
/** Specialized parallel variant of {@link SkyQueryEnvironment#getRBuildFiles}. */
@@ -87,28 +95,24 @@ class ParallelSkyQueryUtils {
SkyQueryEnvironment env,
Collection<PathFragment> fileIdentifiers,
ThreadSafeCallback<Target> callback,
- ForkJoinPool forkJoinPool,
MultisetSemaphore<PackageIdentifier> packageSemaphore)
throws QueryException, InterruptedException {
ThreadSafeUniquifier<SkyKey> keyUniquifier = env.createSkyKeyUniquifier();
RBuildFilesVisitor visitor =
- new RBuildFilesVisitor(env, forkJoinPool, keyUniquifier, callback, packageSemaphore);
+ new RBuildFilesVisitor(env, keyUniquifier, callback, packageSemaphore);
visitor.visitAndWaitForCompletion(env.getSkyKeysForFileFragments(fileIdentifiers));
}
/** A helper class that computes 'rbuildfiles(<blah>)' via BFS. */
private static class RBuildFilesVisitor extends AbstractSkyKeyBFSVisitor<SkyKey> {
- private final SkyQueryEnvironment env;
private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
private RBuildFilesVisitor(
SkyQueryEnvironment env,
- ForkJoinPool forkJoinPool,
ThreadSafeUniquifier<SkyKey> uniquifier,
Callback<Target> callback,
MultisetSemaphore<PackageIdentifier> packageSemaphore) {
- super(forkJoinPool, uniquifier, callback);
- this.env = env;
+ super(env, uniquifier, callback);
this.packageSemaphore = packageSemaphore;
}
@@ -171,17 +175,14 @@ class ParallelSkyQueryUtils {
*/
private static class AllRdepsUnboundedVisitor
extends AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> {
- private final SkyQueryEnvironment env;
private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
private AllRdepsUnboundedVisitor(
SkyQueryEnvironment env,
- ForkJoinPool forkJoinPool,
ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier,
ThreadSafeCallback<Target> callback,
MultisetSemaphore<PackageIdentifier> packageSemaphore) {
- super(forkJoinPool, uniquifier, callback);
- this.env = env;
+ super(env, uniquifier, callback);
this.packageSemaphore = packageSemaphore;
}
@@ -194,7 +195,6 @@ class ParallelSkyQueryUtils {
*/
private static class Factory implements AbstractSkyKeyBFSVisitor.Factory {
private final SkyQueryEnvironment env;
- private final ForkJoinPool forkJoinPool;
private final ThreadSafeUniquifier<Pair<SkyKey, SkyKey>> uniquifier;
private final ThreadSafeCallback<Target> callback;
private final MultisetSemaphore<PackageIdentifier> packageSemaphore;
@@ -202,10 +202,8 @@ class ParallelSkyQueryUtils {
private Factory(
SkyQueryEnvironment env,
ThreadSafeCallback<Target> callback,
- ForkJoinPool forkJoinPool,
MultisetSemaphore<PackageIdentifier> packageSemaphore) {
this.env = env;
- this.forkJoinPool = forkJoinPool;
this.uniquifier = env.createReverseDepSkyKeyUniquifier();
this.callback = callback;
this.packageSemaphore = packageSemaphore;
@@ -213,8 +211,7 @@ class ParallelSkyQueryUtils {
@Override
public AbstractSkyKeyBFSVisitor<Pair<SkyKey, SkyKey>> create() {
- return new AllRdepsUnboundedVisitor(
- env, forkJoinPool, uniquifier, callback, packageSemaphore);
+ return new AllRdepsUnboundedVisitor(env, uniquifier, callback, packageSemaphore);
}
}
@@ -267,29 +264,17 @@ class ParallelSkyQueryUtils {
// recursive visitation.
Map<SkyKey, Iterable<SkyKey>> unfilteredReverseDeps = env.graph.getReverseDeps(filteredKeys);
- // Build a collection of Pairs and group by package id so we can partition them efficiently
- // later.
- ArrayListMultimap<PackageIdentifier, Pair<SkyKey, SkyKey>> rdepsByPackage =
- ArrayListMultimap.create();
+ ImmutableList.Builder<Pair<SkyKey, SkyKey>> builder = ImmutableList.builder();
for (Map.Entry<SkyKey, Iterable<SkyKey>> rdeps : unfilteredReverseDeps.entrySet()) {
for (SkyKey rdep : rdeps.getValue()) {
Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(rdep);
if (label != null) {
- rdepsByPackage.put(label.getPackageIdentifier(), Pair.of(rdeps.getKey(), rdep));
+ builder.add(Pair.of(rdeps.getKey(), rdep));
}
}
}
- // A couple notes here:
- // (i) ArrayListMultimap#values returns the values grouped by key, which is exactly what we
- // want.
- // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid
- // accidentally retaining the entire ArrayListMultimap object.
- Iterable<Pair<SkyKey, SkyKey>> keysToVisit = ImmutableList.copyOf(rdepsByPackage.values());
-
- // TODO(shazh): Use a global pool to store keys to be returned and keys to be processed, and
- // assign them to VisitTasks. It allows us to better optimize package retrieval.
- return new Visit(/*keysToUseForResult=*/ filteredKeys, /*keysToVisit=*/ keysToVisit);
+ return new Visit(/*keysToUseForResult=*/ filteredKeys, /*keysToVisit=*/ builder.build());
}
@Override
@@ -325,6 +310,33 @@ class ParallelSkyQueryUtils {
}
});
}
+
+ @Override
+ protected Iterable<Task> getVisitTasks(Collection<Pair<SkyKey, SkyKey>> pendingKeysToVisit) {
+ // Group pending visits by package.
+ ArrayListMultimap<PackageIdentifier, Pair<SkyKey, SkyKey>> visitsByPackage =
+ ArrayListMultimap.create();
+ for (Pair<SkyKey, SkyKey> visit : pendingKeysToVisit) {
+ Label label = SkyQueryEnvironment.SKYKEY_TO_LABEL.apply(visit.second);
+ if (label != null) {
+ visitsByPackage.put(label.getPackageIdentifier(), visit);
+ }
+ }
+
+ ImmutableList.Builder<Task> builder = ImmutableList.builder();
+
+ // A couple notes here:
+ // (i) ArrayListMultimap#values returns the values grouped by key, which is exactly what we
+ // want.
+ // (ii) ArrayListMultimap#values returns a Collection view, so we make a copy to avoid
+ // accidentally retaining the entire ArrayListMultimap object.
+ for (Iterable<Pair<SkyKey, SkyKey>> keysToVisitBatch :
+ Iterables.partition(ImmutableList.copyOf(visitsByPackage.values()), VISIT_BATCH_SIZE)) {
+ builder.add(new VisitTask(keysToVisitBatch));
+ }
+
+ return builder.build();
+ }
}
/**
@@ -349,28 +361,90 @@ class ParallelSkyQueryUtils {
/**
* A helper class for performing a custom BFS visitation on the Skyframe graph, using {@link
- * ForkJoinPool}.
+ * QuiescingExecutor}.
*
- * <p>The choice of {@link ForkJoinPool} over, say, AbstractQueueVisitor backed by a
- * ThreadPoolExecutor, is very deliberate. {@link SkyKeyBFSVisitorCallback#process} kicks off a
- * visitation and blocks on completion of it. But this visitation may never complete if there are
- * a bounded number of threads in the global thread pool used for query evaluation!
+ * <p>The visitor uses an AbstractQueueVisitor backed by a ThreadPoolExecutor with a thread pool
+ * NOT part of the global query evaluation pool to avoid starvation.
*/
@ThreadSafe
private abstract static class AbstractSkyKeyBFSVisitor<T> {
- private final ForkJoinPool forkJoinPool;
+ protected final SkyQueryEnvironment env;
private final ThreadSafeUniquifier<T> uniquifier;
private final Callback<Target> callback;
- /** The maximum number of keys to visit at once. */
- private static final int VISIT_BATCH_SIZE = 10000;
+
+ private final QuiescingExecutor executor;
+
+ /** A queue to store pending visits. */
+ private final LinkedBlockingQueue<T> processingQueue = new LinkedBlockingQueue<>();
+
+ /**
+ * The max time interval between two scheduling passes in milliseconds. A scheduling pass is
+ * defined as the scheduler thread determining whether to drain all pending visits from the
+ * queue and submitting tasks to perform the visits.
+ *
+ * <p>The choice of 1ms is a result based of experiments. It is an attempted balance due to a
+ * few facts about the scheduling interval:
+ *
+ * <p>1. A large interval adds systematic delay. In an extreme case, a BFS visit which is
+ * supposed to take only 1ms now may take 5ms. For most BFS visits which take longer than a few
+ * hundred milliseconds, it should not be noticeable.
+ *
+ * <p>2. A zero-interval config eats too much CPU.
+ *
+ * <p>Even though the scheduler runs once every 1 ms, it does not try to drain it every time.
+ * Pending visits are drained only certain criteria are met.
+ */
+ private static final long SCHEDULING_INTERVAL_MILLISECONDS = 1;
+
+ /**
+ * The minimum number of pending tasks the scheduler tries to hit. The 3x number is set based on
+ * experiments. We do not want to schedule tasks too frequently to miss the benefits of large
+ * number of keys being grouped by packages. On the other hand, we want to keep all threads in
+ * the pool busy to achieve full capacity. A low number here will cause some of the worker
+ * threads to go idle at times before the next scheduling cycle.
+ *
+ * <p>TODO(shazh): Revisit the choice of task target based on real-prod performance.
+ */
+ private static final long MIN_PENDING_TASKS = 3 * SkyQueryEnvironment.DEFAULT_THREAD_COUNT;
+
+ /**
+ * Fail fast on RuntimeExceptions, including {code RuntimeInterruptedException} and {@code
+ * RuntimeQueryException}, which are resulted from InterruptedException and QueryException.
+ */
+ static final ErrorClassifier SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER =
+ new ErrorClassifier() {
+ @Override
+ protected ErrorClassification classifyException(Exception e) {
+ return (e instanceof RuntimeException)
+ ? ErrorClassification.CRITICAL_AND_LOG
+ : ErrorClassification.NOT_CRITICAL;
+ }
+ };
+
+ /** All BFS visitors share a single global fixed thread pool. */
+ private static final ExecutorService FIXED_THREAD_POOL_EXECUTOR =
+ new ThreadPoolExecutor(
+ // Must be at least 2 worker threads in the pool (1 for the scheduler thread).
+ /*corePoolSize=*/ Math.max(2, SkyQueryEnvironment.DEFAULT_THREAD_COUNT),
+ /*maximumPoolSize=*/ Math.max(2, SkyQueryEnvironment.DEFAULT_THREAD_COUNT),
+ /*keepAliveTime=*/ 1,
+ /*units=*/ TimeUnit.SECONDS,
+ /*workQueue=*/ new BlockingStack<Runnable>(),
+ new ThreadFactoryBuilder().setNameFormat("skykey-bfs-visitor %d").build());
private AbstractSkyKeyBFSVisitor(
- ForkJoinPool forkJoinPool,
- ThreadSafeUniquifier<T> uniquifier,
- Callback<Target> callback) {
- this.forkJoinPool = forkJoinPool;
+ SkyQueryEnvironment env, ThreadSafeUniquifier<T> uniquifier, Callback<Target> callback) {
+ this.env = env;
this.uniquifier = uniquifier;
this.callback = callback;
+ this.executor =
+ new AbstractQueueVisitor(
+ /*concurrent=*/ true,
+ /*executorService=*/ FIXED_THREAD_POOL_EXECUTOR,
+ // Leave the thread pool active for other current and future callers.
+ /*shutdownOnCompletion=*/ false,
+ /*failFastOnException=*/ true,
+ /*errorClassifier=*/ SKYKEY_BFS_VISITOR_ERROR_CLASSIFIER);
}
/** Factory for {@link AbstractSkyKeyBFSVisitor} instances. */
@@ -390,44 +464,103 @@ class ParallelSkyQueryUtils {
void visitAndWaitForCompletion(Iterable<SkyKey> keys)
throws QueryException, InterruptedException {
- Iterable<ForkJoinTask<?>> tasks =
- getTasks(
- new Visit(
- /*keysToUseForResult=*/ ImmutableList.<SkyKey>of(),
- /*keysToVisit=*/ preprocessInitialVisit(keys)));
- for (ForkJoinTask<?> task : tasks) {
- forkJoinPool.execute(task);
- }
+ processingQueue.addAll(ImmutableList.copyOf(preprocessInitialVisit(keys)));
+ // We add the scheduler to the pool, allowing it (as well as any submitted tasks later)
+ // to be failed fast if any QueryException or InterruptedException is received.
+ executor.execute(new Scheduler());
try {
- MoreFutures.waitForAllInterruptiblyFailFast(tasks);
- } catch (ExecutionException ee) {
- Throwable cause = ee.getCause();
- if (cause instanceof RuntimeQueryException) {
- throw (QueryException) cause.getCause();
- } else if (cause instanceof RuntimeInterruptedException) {
- throw (InterruptedException) cause.getCause();
- } else {
- throw new IllegalStateException(cause);
+ executor.awaitQuiescence(true);
+ } catch (RuntimeQueryException e) {
+ throw (QueryException) e.getCause();
+ } catch (RuntimeInterruptedException e) {
+ throw (InterruptedException) e.getCause();
+ }
+ }
+
+ /**
+ * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s in
+ * the full visitation to the given {@link Callback}.
+ */
+ protected abstract void processResultantTargets(
+ Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
+ throws QueryException, InterruptedException;
+
+ /** Gets the {@link Visit} representing the local visitation of the given {@code values}. */
+ protected abstract Visit getVisitResult(Iterable<T> values) throws InterruptedException;
+
+ /** Gets the first {@link Visit} representing the entry-level SkyKeys. */
+ protected abstract Iterable<T> preprocessInitialVisit(Iterable<SkyKey> keys);
+
+ protected Iterable<Task> getVisitTasks(Collection<T> pendingKeysToVisit) {
+ ImmutableList.Builder<Task> builder = ImmutableList.builder();
+ for (Iterable<T> keysToVisitBatch :
+ Iterables.partition(pendingKeysToVisit, VISIT_BATCH_SIZE)) {
+ builder.add(new VisitTask(keysToVisitBatch));
+ }
+
+ return builder.build();
+ }
+
+ private class Scheduler implements Runnable {
+ @Override
+ public void run() {
+ // The scheduler keeps running until both the following two conditions are met.
+ //
+ // 1. There is no pending visit in the queue.
+ // 2. There is no pending task (other than itself) in the pool.
+ if (processingQueue.isEmpty() && executor.getRemainingTasksCount() <= 1) {
+ return;
+ }
+
+ // To achieve maximum efficiency, queue is drained in either of the following 2 conditions:
+ //
+ // 1. The number of pending tasks is low. We schedule new tasks to avoid wasting CPU.
+ // 2. The process queue size is large.
+ if (executor.getRemainingTasksCount() < MIN_PENDING_TASKS
+ || processingQueue.size() >= SkyQueryEnvironment.BATCH_CALLBACK_SIZE) {
+ drainProcessingQueue();
+ }
+
+ try {
+ // Wait at most {@code SCHEDULING_INTERVAL_MILLISECONDS} milliseconds.
+ Thread.sleep(SCHEDULING_INTERVAL_MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeInterruptedException(e);
+ }
+
+ executor.execute(new Scheduler());
+ }
+
+ private void drainProcessingQueue() {
+ Collection<T> pendingKeysToVisit = new ArrayList<>(processingQueue.size());
+ processingQueue.drainTo(pendingKeysToVisit);
+ if (pendingKeysToVisit.isEmpty()) {
+ return;
+ }
+
+ for (Task task : getVisitTasks(pendingKeysToVisit)) {
+ executor.execute(task);
}
}
}
- private abstract static class AbstractInternalRecursiveAction extends RecursiveAction {
- protected abstract void computeImpl() throws QueryException, InterruptedException;
+ abstract static class Task implements Runnable {
@Override
- public final void compute() {
+ public void run() {
try {
- computeImpl();
- } catch (QueryException queryException) {
- throw new RuntimeQueryException(queryException);
- } catch (InterruptedException interruptedException) {
- throw new RuntimeInterruptedException(interruptedException);
+ process();
+ } catch (QueryException e) {
+ throw new RuntimeQueryException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeInterruptedException(e);
}
}
+
+ abstract void process() throws QueryException, InterruptedException;
}
- private class VisitTask extends AbstractInternalRecursiveAction {
+ class VisitTask extends Task {
private final Iterable<T> keysToVisit;
private VisitTask(Iterable<T> keysToVisit) {
@@ -435,22 +568,24 @@ class ParallelSkyQueryUtils {
}
@Override
- protected void computeImpl() throws InterruptedException {
+ void process() throws InterruptedException {
ImmutableList<T> uniqueKeys = uniquifier.unique(keysToVisit);
if (uniqueKeys.isEmpty()) {
return;
}
- Iterable<ForkJoinTask<?>> tasks = getTasks(getVisitResult(uniqueKeys));
- for (ForkJoinTask<?> task : tasks) {
- task.fork();
- }
- for (ForkJoinTask<?> task : tasks) {
- task.join();
+
+ Visit visit = getVisitResult(uniqueKeys);
+ for (Iterable<SkyKey> keysToUseForResultBatch :
+ Iterables.partition(
+ visit.keysToUseForResult, SkyQueryEnvironment.BATCH_CALLBACK_SIZE)) {
+ executor.execute(new GetAndProcessResultsTask(keysToUseForResultBatch));
}
+
+ processingQueue.addAll(ImmutableList.copyOf(visit.keysToVisit));
}
}
- private class GetAndProcessResultsTask extends AbstractInternalRecursiveAction {
+ private class GetAndProcessResultsTask extends Task {
private final Iterable<SkyKey> keysToUseForResult;
private GetAndProcessResultsTask(Iterable<SkyKey> keysToUseForResult) {
@@ -458,42 +593,10 @@ class ParallelSkyQueryUtils {
}
@Override
- protected void computeImpl() throws QueryException, InterruptedException {
+ protected void process() throws QueryException, InterruptedException {
processResultantTargets(keysToUseForResult, callback);
}
}
-
- private Iterable<ForkJoinTask<?>> getTasks(Visit visit) {
- // Split the given visit request into ForkJoinTasks for visiting keys and ForkJoinTasks for
- // getting and outputting results, each of which obeys the separate batch limits.
- // TODO(bazel-team): Attempt to group work on targets within the same package.
- ImmutableList.Builder<ForkJoinTask<?>> tasksBuilder = ImmutableList.builder();
- // Fork the tasks for getting and outputting results first - this way we maximize for
- // throughput to the underlying callback.
- for (Iterable<SkyKey> keysToUseForResultBatch : Iterables.partition(
- visit.keysToUseForResult, SkyQueryEnvironment.BATCH_CALLBACK_SIZE)) {
- tasksBuilder.add(new GetAndProcessResultsTask(keysToUseForResultBatch));
- }
- for (Iterable<T> keysToVisitBatch :
- Iterables.partition(visit.keysToVisit, VISIT_BATCH_SIZE)) {
- tasksBuilder.add(new VisitTask(keysToVisitBatch));
- }
- return tasksBuilder.build();
- }
-
- /**
- * Forwards the given {@code keysToUseForResult}'s contribution to the set of {@link Target}s
- * in the full visitation to the given {@link Callback}.
- */
- protected abstract void processResultantTargets(
- Iterable<SkyKey> keysToUseForResult, Callback<Target> callback)
- throws QueryException, InterruptedException;
-
- /** Gets the {@link Visit} representing the local visitation of the given {@code values}. */
- protected abstract Visit getVisitResult(Iterable<T> values) throws InterruptedException;
-
- /** Gets the first {@link Visit} representing the entry-level SkyKeys. */
- protected abstract Iterable<T> preprocessInitialVisit(Iterable<SkyKey> keys);
}
private static class RuntimeQueryException extends RuntimeException {
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 705738ce84..87a57af2c4 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -853,17 +853,20 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
public Map<SkyKey, Target> makeTargetsFromPackageKeyToTargetKeyMap(
Multimap<SkyKey, SkyKey> packageKeyToTargetKeyMap) throws InterruptedException {
ImmutableMap.Builder<SkyKey, Target> result = ImmutableMap.builder();
+ Set<SkyKey> processedTargets = new HashSet<>();
Map<SkyKey, SkyValue> packageMap = graph.getSuccessfulValues(packageKeyToTargetKeyMap.keySet());
for (Map.Entry<SkyKey, SkyValue> entry : packageMap.entrySet()) {
for (SkyKey targetKey : packageKeyToTargetKeyMap.get(entry.getKey())) {
- try {
- result.put(
- targetKey,
- ((PackageValue) entry.getValue())
- .getPackage()
- .getTarget((SKYKEY_TO_LABEL.apply(targetKey)).getName()));
- } catch (NoSuchTargetException e) {
- // Skip missing target.
+ if (processedTargets.add(targetKey)) {
+ try {
+ result.put(
+ targetKey,
+ ((PackageValue) entry.getValue())
+ .getPackage()
+ .getTarget((SKYKEY_TO_LABEL.apply(targetKey)).getName()));
+ } catch (NoSuchTargetException e) {
+ // Skip missing target.
+ }
}
}
}
@@ -1013,8 +1016,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
ThreadSafeCallback<Target> callback,
ForkJoinPool forkJoinPool)
throws QueryException, InterruptedException {
- ParallelSkyQueryUtils.getRBuildFilesParallel(
- this, fileIdentifiers, callback, forkJoinPool, packageSemaphore);
+ ParallelSkyQueryUtils.getRBuildFilesParallel(this, fileIdentifiers, callback, packageSemaphore);
}
/**
@@ -1199,7 +1201,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
ForkJoinPool forkJoinPool)
throws QueryException, InterruptedException {
ParallelSkyQueryUtils.getAllRdepsUnboundedParallel(
- this, expression, context, callback, forkJoinPool, packageSemaphore);
+ this, expression, context, callback, packageSemaphore);
}
@ThreadSafe