From 2e2b459aaff7e33d9a9fce6a89ef32634b99f439 Mon Sep 17 00:00:00 2001 From: Nathan Harmata Date: Wed, 21 Sep 2016 17:17:33 +0000 Subject: Use ForkJoinPool, rather than ListeningExecutorService, for parallel query evaluation in SkyQueryEnvironment. FJP is nicer to program against, imo. -- MOS_MIGRATED_REVID=133844508 --- .../build/lib/query2/SkyQueryEnvironment.java | 53 +++++++++++++++------- 1 file changed, 37 insertions(+), 16 deletions(-) (limited to 'src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java') diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index 4de4f87719..39385ddd78 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -28,16 +28,13 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.devtools.build.lib.cmdline.Label; import com.google.devtools.build.lib.cmdline.LabelSyntaxException; import com.google.devtools.build.lib.cmdline.PackageIdentifier; import com.google.devtools.build.lib.cmdline.TargetParsingException; import com.google.devtools.build.lib.cmdline.TargetPattern; import com.google.devtools.build.lib.collect.CompactHashSet; -import com.google.devtools.build.lib.concurrent.ExecutorUtil; +import com.google.devtools.build.lib.concurrent.NamedForkJoinPool; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.EventHandler; @@ -103,7 +100,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Queue; import java.util.Set; -import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -136,14 +133,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment private final List universeScope; protected final String parserPrefix; private final PathPackageLocator pkgPath; - - // Note that the executor returned by Executors.newFixedThreadPool doesn't start any threads - // unless work is submitted to it. - private final ListeningExecutorService threadPool = - MoreExecutors.listeningDecorator( - Executors.newFixedThreadPool( - DEFAULT_THREAD_COUNT, - new ThreadFactoryBuilder().setNameFormat("QueryEnvironment-%d").build())); + private final ForkJoinPool forkJoinPool; // The following fields are set in the #beforeEvaluateQuery method. protected WalkableGraph graph; @@ -161,6 +151,34 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment WalkableGraphFactory graphFactory, List universeScope, PathPackageLocator pkgPath) { + this( + keepGoing, + loadingPhaseThreads, + // SkyQueryEnvironment operates on a prepopulated Skyframe graph. Therefore, query + // evaluation is completely CPU-bound. + /*queryEvaluationParallelismLevel=*/ DEFAULT_THREAD_COUNT, + eventHandler, + settings, + extraFunctions, + evalListener, + parserPrefix, + graphFactory, + universeScope, + pkgPath); + } + + protected SkyQueryEnvironment( + boolean keepGoing, + int loadingPhaseThreads, + int queryEvaluationParallelismLevel, + EventHandler eventHandler, + Set settings, + Iterable extraFunctions, + QueryExpressionEvalListener evalListener, + String parserPrefix, + WalkableGraphFactory graphFactory, + List universeScope, + PathPackageLocator pkgPath) { super( keepGoing, /*strictScope=*/ true, @@ -170,6 +188,9 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment extraFunctions, evalListener); this.loadingPhaseThreads = loadingPhaseThreads; + // Note that ForkJoinPool doesn't start any thread until work is submitted to it. + this.forkJoinPool = NamedForkJoinPool.newNamedPool( + "QueryEnvironment", queryEvaluationParallelismLevel); this.graphFactory = graphFactory; this.pkgPath = pkgPath; this.universeScope = Preconditions.checkNotNull(universeScope); @@ -201,7 +222,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment graphBackedRecursivePackageProvider, eventHandler, TargetPatternEvaluator.DEFAULT_FILTERING_POLICY, - threadPool); + forkJoinPool); } /** @@ -265,7 +286,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment @Override public void close() { - ExecutorUtil.interruptibleShutdown(threadPool); + forkJoinPool.shutdownNow(); } @Override @@ -505,7 +526,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment // TODO(bazel-team): Refactor QueryEnvironment et al. such that this optimization is enabled for // all QueryEnvironment implementations. if (callback instanceof ThreadSafeCallback) { - expr.parEval(this, context, (ThreadSafeCallback) callback, threadPool); + expr.parEval(this, context, (ThreadSafeCallback) callback, forkJoinPool); } else { expr.eval(this, context, callback); } -- cgit v1.2.3