diff options
author | 2016-09-21 17:17:33 +0000 | |
---|---|---|
committer | 2016-09-22 09:52:33 +0000 | |
commit | 2e2b459aaff7e33d9a9fce6a89ef32634b99f439 (patch) | |
tree | 34809e36954745de09995c1401efd87d39144204 /src/main/java/com/google/devtools | |
parent | e67731a2444b08e0b84284073b4f463b733062fd (diff) |
Use ForkJoinPool, rather than ListeningExecutorService, for parallel query evaluation in SkyQueryEnvironment. FJP is nicer to program against, imo.
--
MOS_MIGRATED_REVID=133844508
Diffstat (limited to 'src/main/java/com/google/devtools')
17 files changed, 113 insertions, 116 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java index 3077b689a0..d25717b353 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/RBuildFilesFunction.java @@ -16,7 +16,6 @@ package com.google.devtools.build.lib.query2; import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.packages.Target; import com.google.devtools.build.lib.query2.engine.Callback; import com.google.devtools.build.lib.query2.engine.QueryEnvironment; @@ -30,6 +29,7 @@ import com.google.devtools.build.lib.query2.engine.VariableContext; import com.google.devtools.build.lib.vfs.PathFragment; import java.util.List; +import java.util.concurrent.ForkJoinPool; /** * An "rbuildfiles" query expression, which computes the set of packages (as represented by their @@ -90,7 +90,7 @@ public class RBuildFilesFunction implements QueryFunction { QueryExpression expression, List<Argument> args, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService) throws QueryException, InterruptedException { + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { eval(env, context, expression, args, callback); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index 4de4f87719..39385ddd78 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -28,16 +28,13 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.LabelSyntaxException; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.cmdline.TargetParsingException; import com.google.devtools.build.lib.cmdline.TargetPattern; import com.google.devtools.build.lib.collect.CompactHashSet; -import com.google.devtools.build.lib.concurrent.ExecutorUtil; +import com.google.devtools.build.lib.concurrent.NamedForkJoinPool; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.EventHandler; @@ -103,7 +100,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Queue; import java.util.Set; -import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -136,14 +133,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> private final List<String> universeScope; protected final String parserPrefix; private final PathPackageLocator pkgPath; - - // Note that the executor returned by Executors.newFixedThreadPool doesn't start any threads - // unless work is submitted to it. - private final ListeningExecutorService threadPool = - MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool( - DEFAULT_THREAD_COUNT, - new ThreadFactoryBuilder().setNameFormat("QueryEnvironment-%d").build())); + private final ForkJoinPool forkJoinPool; // The following fields are set in the #beforeEvaluateQuery method. protected WalkableGraph graph; @@ -161,6 +151,34 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> WalkableGraphFactory graphFactory, List<String> universeScope, PathPackageLocator pkgPath) { + this( + keepGoing, + loadingPhaseThreads, + // SkyQueryEnvironment operates on a prepopulated Skyframe graph. Therefore, query + // evaluation is completely CPU-bound. + /*queryEvaluationParallelismLevel=*/ DEFAULT_THREAD_COUNT, + eventHandler, + settings, + extraFunctions, + evalListener, + parserPrefix, + graphFactory, + universeScope, + pkgPath); + } + + protected SkyQueryEnvironment( + boolean keepGoing, + int loadingPhaseThreads, + int queryEvaluationParallelismLevel, + EventHandler eventHandler, + Set<Setting> settings, + Iterable<QueryFunction> extraFunctions, + QueryExpressionEvalListener<Target> evalListener, + String parserPrefix, + WalkableGraphFactory graphFactory, + List<String> universeScope, + PathPackageLocator pkgPath) { super( keepGoing, /*strictScope=*/ true, @@ -170,6 +188,9 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> extraFunctions, evalListener); this.loadingPhaseThreads = loadingPhaseThreads; + // Note that ForkJoinPool doesn't start any thread until work is submitted to it. + this.forkJoinPool = NamedForkJoinPool.newNamedPool( + "QueryEnvironment", queryEvaluationParallelismLevel); this.graphFactory = graphFactory; this.pkgPath = pkgPath; this.universeScope = Preconditions.checkNotNull(universeScope); @@ -201,7 +222,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> graphBackedRecursivePackageProvider, eventHandler, TargetPatternEvaluator.DEFAULT_FILTERING_POLICY, - threadPool); + forkJoinPool); } /** @@ -265,7 +286,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> @Override public void close() { - ExecutorUtil.interruptibleShutdown(threadPool); + forkJoinPool.shutdownNow(); } @Override @@ -505,7 +526,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> // TODO(bazel-team): Refactor QueryEnvironment et al. such that this optimization is enabled for // all QueryEnvironment implementations. if (callback instanceof ThreadSafeCallback) { - expr.parEval(this, context, (ThreadSafeCallback<Target>) callback, threadPool); + expr.parEval(this, context, (ThreadSafeCallback<Target>) callback, forkJoinPool); } else { expr.eval(this, context, callback); } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java index b491097eb7..adc12d278f 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllPathsFunction.java @@ -18,7 +18,6 @@ import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; @@ -27,6 +26,7 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.ForkJoinPool; /** * Implementation of the <code>allpaths()</code> function. @@ -89,7 +89,7 @@ public class AllPathsFunction implements QueryFunction { QueryExpression expression, List<Argument> args, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService) throws QueryException, InterruptedException { + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { eval(env, context, expression, args, callback); } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java index 0da05aaa1d..faa9977967 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/AllRdepsFunction.java @@ -17,13 +17,13 @@ import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ForkJoinPool; /** * An "allrdeps" query expression, which computes the reverse dependencies of the argument within @@ -111,7 +111,7 @@ public class AllRdepsFunction implements QueryFunction { QueryExpression expression, List<Argument> args, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService) throws QueryException, InterruptedException { + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { eval(env, context, expression, args, callback); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java index 0b6a595487..81f20d4a74 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BuildFilesFunction.java @@ -15,7 +15,6 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.collect.CompactHashSet; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; @@ -23,6 +22,7 @@ import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunctio import java.util.List; import java.util.Set; +import java.util.concurrent.ForkJoinPool; /** * A buildfiles(x) query expression, which computes the set of BUILD files and @@ -72,7 +72,7 @@ class BuildFilesFunction implements QueryFunction { QueryExpression expression, List<Argument> args, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService) throws QueryException, InterruptedException { + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { eval(env, context, expression, args, callback); } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java index 5de84a195c..482b4cd89b 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/DepsFunction.java @@ -15,7 +15,6 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; @@ -23,6 +22,7 @@ import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunctio import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.ForkJoinPool; /** * A "deps" query expression, which computes the dependencies of the argument. An optional @@ -92,7 +92,7 @@ final class DepsFunction implements QueryFunction { QueryExpression expression, List<Argument> args, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService) throws QueryException, InterruptedException { + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { eval(env, context, expression, args, callback); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java index 5a918cb889..6c9ae89d68 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LabelsFunction.java @@ -14,13 +14,13 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ForkJoinPool; /** * A label(attr_name, argument) expression, which computes the set of targets @@ -89,7 +89,7 @@ class LabelsFunction implements QueryFunction { QueryExpression expression, List<Argument> args, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService) throws QueryException, InterruptedException { + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { eval(env, context, expression, args, callback); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java index 5370b4a7d3..509f2ffdfa 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/LoadFilesFunction.java @@ -15,11 +15,11 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.collect.CompactHashSet; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import java.util.List; import java.util.Set; +import java.util.concurrent.ForkJoinPool; /** * A loadfiles(x) query expression, which computes the set of .bzl files @@ -72,7 +72,7 @@ class LoadFilesFunction implements QueryEnvironment.QueryFunction { QueryExpression expression, List<Argument> args, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService) throws QueryException, InterruptedException { + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { eval(env, context, expression, args, callback); } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java index f415b41a1a..5cf7c6efbc 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryEnvironment.java @@ -14,10 +14,10 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.ListeningExecutorService; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.concurrent.ForkJoinPool; import javax.annotation.Nonnull; /** @@ -124,7 +124,7 @@ public interface QueryEnvironment<T> { /** * Same as {@link #eval(QueryEnvironment, VariableContext, QueryExpression, List, Callback)}, - * except that this {@link QueryFunction} may use {@code executorService} to achieve + * except that this {@link QueryFunction} may use {@code forkJoinPool} to achieve * parallelism. * * <p>The caller must ensure that {@code env} is thread safe. @@ -135,7 +135,7 @@ public interface QueryEnvironment<T> { QueryExpression expression, List<Argument> args, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService) throws QueryException, InterruptedException; + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException; } /** diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java index 667fcecf27..e35e9e4807 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpression.java @@ -13,10 +13,10 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import java.util.Collection; +import java.util.concurrent.ForkJoinPool; /** * Base class for expressions in the Blaze query language, revision 2. @@ -86,7 +86,7 @@ public abstract class QueryExpression { /** * Evaluates this query in the specified environment, as in - * {@link #eval(QueryEnvironment, VariableContext, Callback)}, using {@code executorService} to + * {@link #eval(QueryEnvironment, VariableContext, Callback)}, using {@code forkJoinPool} to * achieve parallelism. * * <p>The caller must ensure that {@code env} is thread safe. @@ -96,17 +96,17 @@ public abstract class QueryExpression { QueryEnvironment<T> env, VariableContext<T> context, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService) + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - env.getEvalListener().onParEval(this, env, context, callback, executorService); - parEvalImpl(env, context, callback, executorService); + env.getEvalListener().onParEval(this, env, context, callback, forkJoinPool); + parEvalImpl(env, context, callback, forkJoinPool); } protected <T> void parEvalImpl( QueryEnvironment<T> env, VariableContext<T> context, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService) + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { evalImpl(env, context, callback); } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java index 7255a47dd7..e6bdaef7a9 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryExpressionEvalListener.java @@ -13,8 +13,8 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; +import java.util.concurrent.ForkJoinPool; /** Listener for calls to the internal methods of {@link QueryExpression} used for evaluation. */ @ThreadSafe @@ -32,7 +32,7 @@ public interface QueryExpressionEvalListener<T> { QueryEnvironment<T> env, VariableContext<T> context, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService); + ForkJoinPool forkJoinPool); /** A {@link QueryExpressionEvalListener} that does nothing. */ class NullListener<T> implements QueryExpressionEvalListener<T> { @@ -60,7 +60,7 @@ public interface QueryExpressionEvalListener<T> { QueryEnvironment<T> env, VariableContext<T> context, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService) { + ForkJoinPool forkJoinPool) { } } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java index 44a40c5aa5..9b2401405a 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/RegexFilterExpression.java @@ -16,11 +16,11 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.base.Predicate; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; import java.util.List; +import java.util.concurrent.ForkJoinPool; import java.util.regex.Pattern; /** @@ -72,7 +72,7 @@ public abstract class RegexFilterExpression implements QueryFunction { QueryExpression expression, List<Argument> args, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService) throws QueryException, InterruptedException { + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { eval(env, context, expression, args, callback); } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java index c482c2ec7a..7913e2dbfa 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SomeFunction.java @@ -16,12 +16,12 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; import java.util.List; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -79,7 +79,7 @@ class SomeFunction implements QueryFunction { QueryExpression expression, List<Argument> args, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService) throws QueryException, InterruptedException { + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { eval(env, context, expression, args, callback); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java index fd5a527224..2d0df0ef59 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/SomePathFunction.java @@ -17,13 +17,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.google.common.collect.Sets.SetView; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; import java.util.List; import java.util.Set; +import java.util.concurrent.ForkJoinPool; /** * A somepath(x, y) query expression, which computes the set of nodes @@ -94,7 +94,7 @@ class SomePathFunction implements QueryFunction { QueryExpression expression, List<Argument> args, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService) throws QueryException, InterruptedException { + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { eval(env, context, expression, args, callback); } } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java index ac83b78be1..d802edd0c2 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/TestsFunction.java @@ -15,7 +15,6 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; @@ -28,6 +27,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ForkJoinPool; /** * A tests(x) filter expression, which returns all the tests in set x, @@ -93,7 +93,7 @@ class TestsFunction implements QueryFunction { QueryExpression expression, List<Argument> args, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService) throws QueryException, InterruptedException { + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { eval(env, context, expression, args, callback); } diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java b/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java index 2506d85514..532f331378 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/VisibleFunction.java @@ -15,12 +15,12 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.Argument; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.ArgumentType; import com.google.devtools.build.lib.query2.engine.QueryEnvironment.QueryFunction; import java.util.List; import java.util.Set; +import java.util.concurrent.ForkJoinPool; /** * A visible(x, y) query expression, which computes the subset of nodes in y @@ -78,7 +78,7 @@ public class VisibleFunction implements QueryFunction { QueryExpression expression, List<Argument> args, ThreadSafeCallback<T> callback, - ListeningExecutorService executorService) throws QueryException, InterruptedException { + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { eval(env, context, expression, args, callback); } diff --git a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java index a01a1f4f33..3ed2394dca 100644 --- a/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java +++ b/src/main/java/com/google/devtools/build/lib/skyframe/RecursivePackageProviderBackedTargetPatternResolver.java @@ -22,9 +22,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.cmdline.RepositoryName; @@ -47,9 +44,11 @@ import com.google.devtools.build.lib.vfs.PathFragment; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; /** * A {@link TargetPatternResolver} backed by a {@link RecursivePackageProvider}. @@ -64,13 +63,13 @@ public class RecursivePackageProviderBackedTargetPatternResolver private final RecursivePackageProvider recursivePackageProvider; private final EventHandler eventHandler; private final FilteringPolicy policy; - private final ListeningExecutorService executor; + private final ExecutorService executor; public RecursivePackageProviderBackedTargetPatternResolver( RecursivePackageProvider recursivePackageProvider, EventHandler eventHandler, FilteringPolicy policy, - ListeningExecutorService executor) { + ExecutorService executor) { this.recursivePackageProvider = recursivePackageProvider; this.eventHandler = eventHandler; this.policy = policy; @@ -147,7 +146,7 @@ public class RecursivePackageProviderBackedTargetPatternResolver private Map<PackageIdentifier, ResolvedTargets<Target>> bulkGetTargetsInPackage( String originalPattern, Iterable<PackageIdentifier> pkgIds, FilteringPolicy policy) - throws TargetParsingException, InterruptedException { + throws InterruptedException { try { Map<PackageIdentifier, Package> pkgs = bulkGetPackages(pkgIds); if (pkgs.size() != Iterables.size(pkgIds)) { @@ -204,74 +203,51 @@ public class RecursivePackageProviderBackedTargetPatternResolver } }); final AtomicBoolean foundTarget = new AtomicBoolean(false); - final AtomicReference<InterruptedException> interrupt = new AtomicReference<>(); - final AtomicReference<TargetParsingException> parsingException = new AtomicReference<>(); - final AtomicReference<Exception> genericException = new AtomicReference<>(); - final Object callbackLock = new Object(); // For very large sets of packages, we may not want to process all of them at once, so we split // into batches. List<List<PackageIdentifier>> partitions = ImmutableList.copyOf(Iterables.partition(pkgIds, MAX_PACKAGES_BULK_GET)); - ArrayList<ListenableFuture<?>> futures = new ArrayList<>(partitions.size()); + ArrayList<Callable<Void>> callables = new ArrayList<>(partitions.size()); for (final Iterable<PackageIdentifier> pkgIdBatch : partitions) { - futures.add( - executor.submit( - new Runnable() { - @Override - public void run() { - Iterable<ResolvedTargets<Target>> resolvedTargets; - try { - resolvedTargets = - bulkGetTargetsInPackage(originalPattern, pkgIdBatch, NO_FILTER).values(); - } catch (InterruptedException e) { - interrupt.compareAndSet(null, e); - return; - } catch (TargetParsingException e) { - parsingException.compareAndSet(null, e); - return; - } catch (RuntimeException e) { - // In particular, we're interested in remembering any thrown - // MissingDepExceptions. - genericException.compareAndSet(null, e); - return; - } - - List<Target> filteredTargets = new ArrayList<>(calculateSize(resolvedTargets)); - for (ResolvedTargets<Target> targets : resolvedTargets) { - for (Target target : targets.getTargets()) { - // Perform the no-targets-found check before applying the filtering policy - // so we only return the error if the input directory's subtree really - // contains no targets. - foundTarget.set(true); - if (actualPolicy.shouldRetain(target, false)) { - filteredTargets.add(target); - } - } - } - try { - synchronized (callbackLock) { - callback.process(filteredTargets); - } - } catch (InterruptedException e) { - interrupt.compareAndSet(null, e); - } catch (Exception e) { - genericException.compareAndSet(e, null); - } + callables.add(new Callable<Void>() { + @Override + public Void call() throws E, TargetParsingException, InterruptedException { + Iterable<ResolvedTargets<Target>> resolvedTargets = + bulkGetTargetsInPackage(originalPattern, pkgIdBatch, NO_FILTER).values(); + List<Target> filteredTargets = new ArrayList<>(calculateSize(resolvedTargets)); + for (ResolvedTargets<Target> targets : resolvedTargets) { + for (Target target : targets.getTargets()) { + // Perform the no-targets-found check before applying the filtering policy + // so we only return the error if the input directory's subtree really + // contains no targets. + foundTarget.set(true); + if (actualPolicy.shouldRetain(target, false)) { + filteredTargets.add(target); } - })); + } + } + synchronized (callbackLock) { + callback.process(filteredTargets); + } + return null; + } + }); } - try { - Futures.allAsList(futures).get(); - } catch (ExecutionException e) { - throw new IllegalStateException(e); + // Note that ExecutorService#invokeAll _does_ block until all the Callables have been run. + List<Future<Void>> futures = executor.invokeAll(callables); + for (Future<Void> future : futures) { + try { + future.get(); + } catch (ExecutionException e) { + Throwables.propagateIfPossible(e.getCause(), exceptionClass); + Throwables.propagateIfPossible( + e.getCause(), TargetParsingException.class, InterruptedException.class); + throw new IllegalStateException(e); + } } - - Throwables.propagateIfInstanceOf(interrupt.get(), InterruptedException.class); - Throwables.propagateIfInstanceOf(parsingException.get(), TargetParsingException.class); - Throwables.propagateIfPossible(genericException.get(), exceptionClass); if (!foundTarget.get()) { throw new TargetParsingException("no targets found beneath '" + pathFragment + "'"); } |