aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar tomlu <tomlu@google.com>2017-12-24 11:19:06 -0800
committerGravatar Copybara-Service <copybara-piper@google.com>2017-12-24 11:20:34 -0800
commit68d376335fad134be1191e721d593091a4eea1ec (patch)
tree50daf1ca77fa16024cdae0afe0cc5e7fbde25553
parent782fb914bc670a177e74def9106185b7466befae (diff)
Stop using AtomicLongMap in AbstractQueueVisitor.
This class generates tons of garbage. It's better to manually use a ConcurrentHashMap + AtomicLongs. RELNOTES: None PiperOrigin-RevId: 180053164
-rw-r--r--src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java67
1 files changed, 35 insertions, 32 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
index d08c9f0ff0..6b6c07517d 100644
--- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
+++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java
@@ -17,9 +17,10 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.AtomicLongMap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.devtools.build.lib.concurrent.ErrorClassifier.ErrorClassification;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
@@ -53,36 +54,36 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
p.getWorkQueue(),
new ThreadFactoryBuilder().setNameFormat(p.getPoolName() + " %d").build());
/**
- * The most severe unhandled exception thrown by a worker thread, according to
- * {@link #errorClassifier}. This exception gets propagated to the calling thread of
- * {@link #awaitQuiescence} . We use the most severe error for the sake of not masking e.g.
- * crashes in worker threads after the first critical error that can occur due to race conditions
- * in client code.
+ * The most severe unhandled exception thrown by a worker thread, according to {@link
+ * #errorClassifier}. This exception gets propagated to the calling thread of {@link
+ * #awaitQuiescence} . We use the most severe error for the sake of not masking e.g. crashes in
+ * worker threads after the first critical error that can occur due to race conditions in client
+ * code.
*
* <p>Field updates happen only in blocks that are synchronized on the {@link
* AbstractQueueVisitor} object.
*
* <p>If {@link AbstractQueueVisitor} clients don't like the semantics of storing and propagating
* the most severe error, then they should be provide an {@link ErrorClassifier} that does the
- * right thing (e.g. to cause the _first_ error to be propagated, you'd want to provide an
- * {@link ErrorClassifier} that gives all errors the exact same {@link ErrorClassification}).
+ * right thing (e.g. to cause the _first_ error to be propagated, you'd want to provide an {@link
+ * ErrorClassifier} that gives all errors the exact same {@link ErrorClassification}).
*
* <p>Note that this is not a performance-critical path.
*/
private volatile Throwable unhandled = null;
/**
- * An uncaught exception when submitting a job to the {@link ExecutorService} is catastrophic,
- * and usually indicates a lack of stack space on which to allocate a native thread. The {@link
- * ExecutorService} may reach an inconsistent state in such circumstances, so we avoid blocking
- * on its termination when this field is non-{@code null}.
+ * An uncaught exception when submitting a job to the {@link ExecutorService} is catastrophic, and
+ * usually indicates a lack of stack space on which to allocate a native thread. The {@link
+ * ExecutorService} may reach an inconsistent state in such circumstances, so we avoid blocking on
+ * its termination when this field is non-{@code null}.
*/
private volatile Throwable catastrophe;
/**
* An object used in the manner of a {@link java.util.concurrent.locks.Condition} object, for the
- * condition {@code remainingTasks.get() == 0 || jobsMustBeStopped}.
- * TODO(bazel-team): Replace with an actual {@link java.util.concurrent.locks.Condition} object.
+ * condition {@code remainingTasks.get() == 0 || jobsMustBeStopped}. TODO(bazel-team): Replace
+ * with an actual {@link java.util.concurrent.locks.Condition} object.
*/
private final Object zeroRemainingTasks = new Object();
@@ -99,15 +100,15 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
private volatile boolean jobsMustBeStopped = false;
/** Map from thread to number of jobs executing in the thread. Used for interrupt handling. */
- private final AtomicLongMap<Thread> jobs = AtomicLongMap.create();
+ private final Map<Thread, AtomicLong> jobs = new ConcurrentHashMap<>();
private final ExecutorService executorService;
/**
- * Flag used to record when the main thread (the thread which called {@link #awaitQuiescence})
- * is interrupted.
+ * Flag used to record when the main thread (the thread which called {@link #awaitQuiescence}) is
+ * interrupted.
*
- * When this is {@code true}, adding tasks to the {@link ExecutorService} will fail quietly as
+ * <p>When this is {@code true}, adding tasks to the {@link ExecutorService} will fail quietly as
* a part of the process of shutting down the worker threads.
*/
private volatile boolean threadInterrupted = false;
@@ -117,6 +118,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* for testing.
*/
private final CountDownLatch interruptedLatch = new CountDownLatch(1);
+
private final CountDownLatch exceptionLatch = new CountDownLatch(1);
/** If {@code true}, don't run new actions after an uncaught exception. */
@@ -254,7 +256,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
* occupied with tasks.
*/
protected final int activeParallelTasks() {
- return jobs.asMap().size();
+ return jobs.size();
}
protected void executeRunnable(Runnable runnable) {
@@ -265,16 +267,16 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
boolean critical = false;
ErrorClassification errorClassification = errorClassifier.classify(e);
switch (errorClassification) {
- case AS_CRITICAL_AS_POSSIBLE:
+ case AS_CRITICAL_AS_POSSIBLE:
case CRITICAL_AND_LOG:
critical = true;
logger.log(Level.WARNING, "Found critical error in queue visitor", e);
break;
- case CRITICAL:
- critical = true;
- break;
- default:
- break;
+ case CRITICAL:
+ critical = true;
+ break;
+ default:
+ break;
}
if (unhandled == null
|| errorClassification.compareTo(errorClassifier.classify(unhandled)) > 0) {
@@ -314,15 +316,16 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
/**
* A wrapped {@link Runnable} that:
+ *
* <ul>
* <li>Sets {@link #run} to {@code true} when {@code WrappedRunnable} is run,
* <li>Records the thread evaluating {@code r} in {@link #jobs} while {@code r} is evaluated,
* <li>Prevents {@link #originalRunnable} from being invoked if {@link #blockNewActions} returns
- * {@code true},
+ * {@code true},
* <li>Synchronously invokes {@code runnable.run()},
* <li>Catches any {@link Throwable} thrown by {@code runnable.run()}, and if it is the most
- * severe {@link Throwable} seen by this {@link AbstractQueueVisitor}, assigns it to
- * {@link #unhandled}, and sets {@link #jobsMustBeStopped} if necessary,
+ * severe {@link Throwable} seen by this {@link AbstractQueueVisitor}, assigns it to {@link
+ * #unhandled}, and sets {@link #jobsMustBeStopped} if necessary,
* <li>And, lastly, calls {@link #decrementRemainingTasks}.
* </ul>
*/
@@ -365,11 +368,11 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
}
private void addJob(Thread thread) {
- jobs.incrementAndGet(thread);
+ jobs.computeIfAbsent(thread, k -> new AtomicLong()).incrementAndGet();
}
private void removeJob(Thread thread) {
- if (jobs.decrementAndGet(thread) == 0) {
+ if (jobs.get(thread).decrementAndGet() == 0) {
jobs.remove(thread);
}
}
@@ -483,7 +486,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
if (ownExecutorService) {
executorService.shutdown();
- for (;;) {
+ for (; ; ) {
try {
Throwables.propagateIfPossible(catastrophe);
executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
@@ -497,7 +500,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor {
private void interruptInFlightTasks() {
Thread thisThread = Thread.currentThread();
- for (Thread thread : jobs.asMap().keySet()) {
+ for (Thread thread : jobs.keySet()) {
if (thisThread != thread) {
thread.interrupt();
}