aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
diff options
context:
space:
mode:
authorGravatar Nathan Harmata <nharmata@google.com>2016-09-21 17:17:33 +0000
committerGravatar Laszlo Csomor <laszlocsomor@google.com>2016-09-22 09:52:33 +0000
commit2e2b459aaff7e33d9a9fce6a89ef32634b99f439 (patch)
tree34809e36954745de09995c1401efd87d39144204 /src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
parente67731a2444b08e0b84284073b4f463b733062fd (diff)
Use ForkJoinPool, rather than ListeningExecutorService, for parallel query evaluation in SkyQueryEnvironment. FJP is nicer to program against, imo.
-- MOS_MIGRATED_REVID=133844508
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java53
1 files changed, 37 insertions, 16 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 4de4f87719..39385ddd78 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -28,16 +28,13 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.cmdline.Label;
import com.google.devtools.build.lib.cmdline.LabelSyntaxException;
import com.google.devtools.build.lib.cmdline.PackageIdentifier;
import com.google.devtools.build.lib.cmdline.TargetParsingException;
import com.google.devtools.build.lib.cmdline.TargetPattern;
import com.google.devtools.build.lib.collect.CompactHashSet;
-import com.google.devtools.build.lib.concurrent.ExecutorUtil;
+import com.google.devtools.build.lib.concurrent.NamedForkJoinPool;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
@@ -103,7 +100,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -136,14 +133,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
private final List<String> universeScope;
protected final String parserPrefix;
private final PathPackageLocator pkgPath;
-
- // Note that the executor returned by Executors.newFixedThreadPool doesn't start any threads
- // unless work is submitted to it.
- private final ListeningExecutorService threadPool =
- MoreExecutors.listeningDecorator(
- Executors.newFixedThreadPool(
- DEFAULT_THREAD_COUNT,
- new ThreadFactoryBuilder().setNameFormat("QueryEnvironment-%d").build()));
+ private final ForkJoinPool forkJoinPool;
// The following fields are set in the #beforeEvaluateQuery method.
protected WalkableGraph graph;
@@ -161,6 +151,34 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
WalkableGraphFactory graphFactory,
List<String> universeScope,
PathPackageLocator pkgPath) {
+ this(
+ keepGoing,
+ loadingPhaseThreads,
+ // SkyQueryEnvironment operates on a prepopulated Skyframe graph. Therefore, query
+ // evaluation is completely CPU-bound.
+ /*queryEvaluationParallelismLevel=*/ DEFAULT_THREAD_COUNT,
+ eventHandler,
+ settings,
+ extraFunctions,
+ evalListener,
+ parserPrefix,
+ graphFactory,
+ universeScope,
+ pkgPath);
+ }
+
+ protected SkyQueryEnvironment(
+ boolean keepGoing,
+ int loadingPhaseThreads,
+ int queryEvaluationParallelismLevel,
+ EventHandler eventHandler,
+ Set<Setting> settings,
+ Iterable<QueryFunction> extraFunctions,
+ QueryExpressionEvalListener<Target> evalListener,
+ String parserPrefix,
+ WalkableGraphFactory graphFactory,
+ List<String> universeScope,
+ PathPackageLocator pkgPath) {
super(
keepGoing,
/*strictScope=*/ true,
@@ -170,6 +188,9 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
extraFunctions,
evalListener);
this.loadingPhaseThreads = loadingPhaseThreads;
+ // Note that ForkJoinPool doesn't start any thread until work is submitted to it.
+ this.forkJoinPool = NamedForkJoinPool.newNamedPool(
+ "QueryEnvironment", queryEvaluationParallelismLevel);
this.graphFactory = graphFactory;
this.pkgPath = pkgPath;
this.universeScope = Preconditions.checkNotNull(universeScope);
@@ -201,7 +222,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
graphBackedRecursivePackageProvider,
eventHandler,
TargetPatternEvaluator.DEFAULT_FILTERING_POLICY,
- threadPool);
+ forkJoinPool);
}
/**
@@ -265,7 +286,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
@Override
public void close() {
- ExecutorUtil.interruptibleShutdown(threadPool);
+ forkJoinPool.shutdownNow();
}
@Override
@@ -505,7 +526,7 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target>
// TODO(bazel-team): Refactor QueryEnvironment et al. such that this optimization is enabled for
// all QueryEnvironment implementations.
if (callback instanceof ThreadSafeCallback) {
- expr.parEval(this, context, (ThreadSafeCallback<Target>) callback, threadPool);
+ expr.parEval(this, context, (ThreadSafeCallback<Target>) callback, forkJoinPool);
} else {
expr.eval(this, context, callback);
}