aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/concurrent
diff options
context:
space:
mode:
authorGravatar Mark Schaller <mschaller@google.com>2015-10-13 21:23:32 +0000
committerGravatar David Chen <dzc@google.com>2015-10-14 18:29:03 +0000
commitfda203fb0789c53fbf9b59f6d2af27a25061abac (patch)
tree8b4cda3b759a3d99e6d045a0dac3c59e0e59006e /src/main/java/com/google/devtools/build/lib/concurrent
parentfa35b181de505a383300ab261dd180fd504ae105 (diff)
Mostly lockless updates of remainingTasks counter
Uses an AtomicLong to count remaining tasks. Only obtains the zeroRemainingTasks lock when remaining tasks have gone to zero or the codepath needs to wait on that condition. -- MOS_MIGRATED_REVID=105348523
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/concurrent')
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java67
1 files changed, 51 insertions, 16 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index b0ed18f818..5fe85bbf65 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -29,6 +29,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -106,9 +107,18 @@ public class AbstractQueueVisitor {
*/
private final boolean concurrent;
- // Condition variable for remainingTasks==0, and a lock for it.
+ /**
+ * An object used in the manner of a {@link java.util.concurrent.locks.Condition} object, for the
+ * condition {@code remainingTasks.get() == 0}.
+ * TODO(bazel-team): Replace with an actual {@link java.util.concurrent.locks.Condition} object.
+ */
private final Object zeroRemainingTasks = new Object();
- private long remainingTasks = 0;
+
+ /**
+ * If {@link #concurrent} is {@code true}, then this is a counter of the number of {@link
+ * Runnable}s {@link #enqueue}-d that have not finished evaluation.
+ */
+ private final AtomicLong remainingTasks = new AtomicLong(0);
// Map of thread ==> number of jobs executing in the thread.
// Currently used only for interrupt handling.
@@ -353,7 +363,7 @@ public class AbstractQueueVisitor {
* if a worker throws a critical error (see {@link #isCriticalError(Throwable)}). If
* false, just wait for them to terminate normally.
*/
- protected void work(boolean interruptWorkers) throws InterruptedException {
+ protected final void work(boolean interruptWorkers) throws InterruptedException {
if (concurrent) {
awaitTermination(interruptWorkers);
} else {
@@ -367,10 +377,18 @@ public class AbstractQueueVisitor {
* Schedules a call.
* Called in a worker thread if concurrent.
*/
- protected void enqueue(Runnable runnable) {
+ protected final void enqueue(Runnable runnable) {
if (concurrent) {
AtomicBoolean ranTask = new AtomicBoolean(false);
try {
+ // It's impossible for this increment to result in remainingTasks.get <= 0 because
+ // remainingTasks is never negative. Therefore it isn't necessary to check its value for
+ // the purpose of updating zeroRemainingTasks.
+ long tasks = remainingTasks.incrementAndGet();
+ Preconditions.checkState(
+ tasks > 0,
+ "Incrementing remaining tasks counter resulted in impossible non-positive number %s",
+ tasks);
pool.execute(wrapRunnable(runnable, ranTask));
} catch (Throwable e) {
if (!ranTask.get()) {
@@ -406,10 +424,22 @@ public class AbstractQueueVisitor {
}
}
+ /**
+ * Wraps {@param runnable} in a newly constructed {@link Runnable} {@code r} that:
+ * <ul>
+ * <li>Sets {@param ranTask} to {@code true} as soon as {@code r} starts to be evaluated,
+ * <li>Records the thread evaluating {@code r} in {@link #jobs} while {@code r} is evaluated,
+ * <li>Prevents {@param runnable} from being invoked if {@link #blockNewActions} returns
+ * {@code true},
+ * <li>Synchronously invokes {@code runnable.run()},
+ * <li>Catches any {@link Throwable} thrown by {@code runnable.run()}, and if it is the first
+ * {@link Throwable} seen by this {@link AbstractQueueVisitor}, assigns it to {@link
+ * #unhandled}, and calls {@link #markToStopAllJobsIfNeeded} to set {@link #jobsMustBeStopped}
+ * if necessary,
+ * <li>And, lastly, calls {@link #decrementRemainingTasks}.
+ * </ul>
+ */
private Runnable wrapRunnable(final Runnable runnable, final AtomicBoolean ranTask) {
- synchronized (zeroRemainingTasks) {
- remainingTasks++;
- }
return new Runnable() {
@Override
public void run() {
@@ -474,8 +504,15 @@ public class AbstractQueueVisitor {
}
private void decrementRemainingTasks() {
- synchronized (zeroRemainingTasks) {
- if (--remainingTasks == 0) {
+ // This decrement statement may result in remainingTasks.get() == 0, so it must be checked
+ // and the zeroRemainingTasks condition object notified if that condition is obtained.
+ long tasks = remainingTasks.decrementAndGet();
+ Preconditions.checkState(
+ tasks >= 0,
+ "Decrementing remaining tasks counter resulted in impossible negative number %s",
+ tasks);
+ if (tasks == 0) {
+ synchronized (zeroRemainingTasks) {
zeroRemainingTasks.notify();
}
}
@@ -513,7 +550,7 @@ public class AbstractQueueVisitor {
* Get the value of the interrupted flag.
*/
@ThreadSafety.ThreadSafe
- protected boolean isInterrupted() {
+ protected final boolean isInterrupted() {
return threadInterrupted;
}
@@ -522,10 +559,8 @@ public class AbstractQueueVisitor {
* if running tasks submit further jobs.
*/
@VisibleForTesting
- protected long getTaskCount() {
- synchronized (zeroRemainingTasks) {
- return remainingTasks;
- }
+ protected final long getTaskCount() {
+ return remainingTasks.get();
}
/**
@@ -538,7 +573,7 @@ public class AbstractQueueVisitor {
Throwables.propagateIfPossible(catastrophe);
try {
synchronized (zeroRemainingTasks) {
- while (remainingTasks != 0 && !jobsMustBeStopped) {
+ while (remainingTasks.get() != 0 && !jobsMustBeStopped) {
zeroRemainingTasks.wait();
}
}
@@ -581,7 +616,7 @@ public class AbstractQueueVisitor {
Throwables.propagateIfPossible(catastrophe);
synchronized (zeroRemainingTasks) {
- while (remainingTasks != 0) {
+ while (remainingTasks.get() != 0) {
try {
zeroRemainingTasks.wait();
} catch (InterruptedException e) {