aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar Mark Schaller <mschaller@google.com>2015-11-02 19:55:49 +0000
committerGravatar David Chen <dzc@google.com>2015-11-02 23:19:31 +0000
commit36e8c5b09ea48b03d79ad4ffd3c0bd8d6f7c02d6 (patch)
tree647a3b9249d8956d730d9d545484d8b9fd232271
parent9c1be138546bba63b6908f07eced461ca3daadc9 (diff)
Introduce ForkJoinQuiescingExecutor, permit its use in evaluation
This CL introduces a QuiescingExecutor implementation specialized for ForkJoinPools with the same interrupt handling, error propagation, and task completion semantics as AbstractQueueVisitor. Currently it does this by largely sharing its implementation with AQV. Future refactoring could let it rely more on ForkJoinPool's own awaitQuiescence implementation to avoid the overhead of AQV's remainingTasks counter maintenance. Subtasks spawned by tasks executing in ForkJoinQuiescingExecutor will rely on ForkJoinPool's thread-local task deques for low contention and (mostly) LIFO ordering. -- MOS_MIGRATED_REVID=106864395
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java44
-rw-r--r--src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java40
2 files changed, 83 insertions, 1 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
new file mode 100644
index 0000000000..34c2c42245
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java
@@ -0,0 +1,44 @@
+// Copyright 2015 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.concurrent;
+
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+
+/** A {@link QuiescingExecutor} implementation that wraps a {@link ForkJoinPool}. */
+// TODO(bazel-team): This extends AQV to ensure that they share the same semantics for interrupt
+// handling, error propagation, and task completion. Because FJP provides a native implementation
+// for awaitQuiescence, a careful refactoring would allow FJQE to avoid the overhead of
+// maintaining AQV.remainingTasks.
+public class ForkJoinQuiescingExecutor extends AbstractQueueVisitor {
+
+ public ForkJoinQuiescingExecutor(ForkJoinPool forkJoinPool, ErrorClassifier errorClassifier) {
+ super(
+ /*concurrent=*/ true,
+ forkJoinPool,
+ /*shutdownOnCompletion=*/ true,
+ /*failFastOnException=*/ true,
+ /*failFastOnInterrupt=*/ true,
+ errorClassifier);
+ }
+
+ @Override
+ protected void executeRunnable(Runnable runnable) {
+ if (ForkJoinTask.inForkJoinPool()) {
+ ForkJoinTask.adapt(runnable).fork();
+ } else {
+ super.executeRunnable(runnable);
+ }
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
index dcc1e07bc3..8356c64d28 100644
--- a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
+++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java
@@ -32,6 +32,7 @@ import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder;
import com.google.devtools.build.lib.collect.nestedset.NestedSetVisitor;
import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor;
import com.google.devtools.build.lib.concurrent.ErrorClassifier;
+import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor;
import com.google.devtools.build.lib.concurrent.QuiescingExecutor;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
import com.google.devtools.build.lib.events.Event;
@@ -59,6 +60,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -145,6 +147,7 @@ public final class ParallelEvaluator implements Evaluator {
private final NestedSetVisitor<TaggedEvents> replayingNestedSetEventVisitor;
private final boolean keepGoing;
private final int threadCount;
+ @Nullable private final ForkJoinPool forkJoinPool;
@Nullable private final EvaluationProgressReceiver progressReceiver;
private final DirtyKeyTracker dirtyKeyTracker;
private final Receiver<Collection<SkyKey>> inflightKeysReceiver;
@@ -174,6 +177,34 @@ public final class ParallelEvaluator implements Evaluator {
this.replayingNestedSetEventVisitor =
new NestedSetVisitor<>(new NestedSetEventReceiver(reporter), emittedEventState);
this.storedEventFilter = storedEventFilter;
+ this.forkJoinPool = null;
+ }
+
+ public ParallelEvaluator(
+ ProcessableGraph graph,
+ Version graphVersion,
+ ImmutableMap<SkyFunctionName, ? extends SkyFunction> skyFunctions,
+ final EventHandler reporter,
+ EmittedEventState emittedEventState,
+ EventFilter storedEventFilter,
+ boolean keepGoing,
+ @Nullable EvaluationProgressReceiver progressReceiver,
+ DirtyKeyTracker dirtyKeyTracker,
+ Receiver<Collection<SkyKey>> inflightKeysReceiver,
+ ForkJoinPool forkJoinPool) {
+ this.graph = graph;
+ this.skyFunctions = skyFunctions;
+ this.graphVersion = graphVersion;
+ this.inflightKeysReceiver = inflightKeysReceiver;
+ this.reporter = Preconditions.checkNotNull(reporter);
+ this.keepGoing = keepGoing;
+ this.threadCount = 0;
+ this.progressReceiver = progressReceiver;
+ this.dirtyKeyTracker = Preconditions.checkNotNull(dirtyKeyTracker);
+ this.replayingNestedSetEventVisitor =
+ new NestedSetVisitor<>(new NestedSetEventReceiver(reporter), emittedEventState);
+ this.storedEventFilter = storedEventFilter;
+ this.forkJoinPool = Preconditions.checkNotNull(forkJoinPool);
}
/**
@@ -619,6 +650,11 @@ public final class ParallelEvaluator implements Evaluator {
private final Set<SkyKey> inflightNodes = Sets.newConcurrentHashSet();
private final Set<RuntimeException> crashes = Sets.newConcurrentHashSet();
+ private ValueVisitor(ForkJoinPool forkJoinPool) {
+ quiescingExecutor =
+ new ForkJoinQuiescingExecutor(forkJoinPool, VALUE_VISITOR_ERROR_CLASSIFIER);
+ }
+
private ValueVisitor(int threadCount) {
quiescingExecutor =
new AbstractQueueVisitor(
@@ -1177,7 +1213,9 @@ public final class ParallelEvaluator implements Evaluator {
Profiler.instance().startTask(ProfilerTask.SKYFRAME_EVAL, skyKeySet);
try {
- return eval(skyKeySet, new ValueVisitor(threadCount));
+ ValueVisitor valueVisitor =
+ forkJoinPool == null ? new ValueVisitor(threadCount) : new ValueVisitor(forkJoinPool);
+ return eval(skyKeySet, valueVisitor);
} finally {
Profiler.instance().completeTask(ProfilerTask.SKYFRAME_EVAL);
}