diff options
author | tomlu <tomlu@google.com> | 2017-12-24 11:19:06 -0800 |
---|---|---|
committer | Copybara-Service <copybara-piper@google.com> | 2017-12-24 11:20:34 -0800 |
commit | 68d376335fad134be1191e721d593091a4eea1ec (patch) | |
tree | 50daf1ca77fa16024cdae0afe0cc5e7fbde25553 | |
parent | 782fb914bc670a177e74def9106185b7466befae (diff) |
Stop using AtomicLongMap in AbstractQueueVisitor.
This class generates tons of garbage. It's better to manually use a ConcurrentHashMap + AtomicLongs.
RELNOTES: None
PiperOrigin-RevId: 180053164
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java | 67 |
1 files changed, 35 insertions, 32 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java index d08c9f0ff0..6b6c07517d 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java @@ -17,9 +17,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.util.concurrent.AtomicLongMap; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.devtools.build.lib.concurrent.ErrorClassifier.ErrorClassification; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; @@ -53,36 +54,36 @@ public class AbstractQueueVisitor implements QuiescingExecutor { p.getWorkQueue(), new ThreadFactoryBuilder().setNameFormat(p.getPoolName() + " %d").build()); /** - * The most severe unhandled exception thrown by a worker thread, according to - * {@link #errorClassifier}. This exception gets propagated to the calling thread of - * {@link #awaitQuiescence} . We use the most severe error for the sake of not masking e.g. - * crashes in worker threads after the first critical error that can occur due to race conditions - * in client code. + * The most severe unhandled exception thrown by a worker thread, according to {@link + * #errorClassifier}. This exception gets propagated to the calling thread of {@link + * #awaitQuiescence} . We use the most severe error for the sake of not masking e.g. crashes in + * worker threads after the first critical error that can occur due to race conditions in client + * code. * * <p>Field updates happen only in blocks that are synchronized on the {@link * AbstractQueueVisitor} object. * * <p>If {@link AbstractQueueVisitor} clients don't like the semantics of storing and propagating * the most severe error, then they should be provide an {@link ErrorClassifier} that does the - * right thing (e.g. to cause the _first_ error to be propagated, you'd want to provide an - * {@link ErrorClassifier} that gives all errors the exact same {@link ErrorClassification}). + * right thing (e.g. to cause the _first_ error to be propagated, you'd want to provide an {@link + * ErrorClassifier} that gives all errors the exact same {@link ErrorClassification}). * * <p>Note that this is not a performance-critical path. */ private volatile Throwable unhandled = null; /** - * An uncaught exception when submitting a job to the {@link ExecutorService} is catastrophic, - * and usually indicates a lack of stack space on which to allocate a native thread. The {@link - * ExecutorService} may reach an inconsistent state in such circumstances, so we avoid blocking - * on its termination when this field is non-{@code null}. + * An uncaught exception when submitting a job to the {@link ExecutorService} is catastrophic, and + * usually indicates a lack of stack space on which to allocate a native thread. The {@link + * ExecutorService} may reach an inconsistent state in such circumstances, so we avoid blocking on + * its termination when this field is non-{@code null}. */ private volatile Throwable catastrophe; /** * An object used in the manner of a {@link java.util.concurrent.locks.Condition} object, for the - * condition {@code remainingTasks.get() == 0 || jobsMustBeStopped}. - * TODO(bazel-team): Replace with an actual {@link java.util.concurrent.locks.Condition} object. + * condition {@code remainingTasks.get() == 0 || jobsMustBeStopped}. TODO(bazel-team): Replace + * with an actual {@link java.util.concurrent.locks.Condition} object. */ private final Object zeroRemainingTasks = new Object(); @@ -99,15 +100,15 @@ public class AbstractQueueVisitor implements QuiescingExecutor { private volatile boolean jobsMustBeStopped = false; /** Map from thread to number of jobs executing in the thread. Used for interrupt handling. */ - private final AtomicLongMap<Thread> jobs = AtomicLongMap.create(); + private final Map<Thread, AtomicLong> jobs = new ConcurrentHashMap<>(); private final ExecutorService executorService; /** - * Flag used to record when the main thread (the thread which called {@link #awaitQuiescence}) - * is interrupted. + * Flag used to record when the main thread (the thread which called {@link #awaitQuiescence}) is + * interrupted. * - * When this is {@code true}, adding tasks to the {@link ExecutorService} will fail quietly as + * <p>When this is {@code true}, adding tasks to the {@link ExecutorService} will fail quietly as * a part of the process of shutting down the worker threads. */ private volatile boolean threadInterrupted = false; @@ -117,6 +118,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { * for testing. */ private final CountDownLatch interruptedLatch = new CountDownLatch(1); + private final CountDownLatch exceptionLatch = new CountDownLatch(1); /** If {@code true}, don't run new actions after an uncaught exception. */ @@ -254,7 +256,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { * occupied with tasks. */ protected final int activeParallelTasks() { - return jobs.asMap().size(); + return jobs.size(); } protected void executeRunnable(Runnable runnable) { @@ -265,16 +267,16 @@ public class AbstractQueueVisitor implements QuiescingExecutor { boolean critical = false; ErrorClassification errorClassification = errorClassifier.classify(e); switch (errorClassification) { - case AS_CRITICAL_AS_POSSIBLE: + case AS_CRITICAL_AS_POSSIBLE: case CRITICAL_AND_LOG: critical = true; logger.log(Level.WARNING, "Found critical error in queue visitor", e); break; - case CRITICAL: - critical = true; - break; - default: - break; + case CRITICAL: + critical = true; + break; + default: + break; } if (unhandled == null || errorClassification.compareTo(errorClassifier.classify(unhandled)) > 0) { @@ -314,15 +316,16 @@ public class AbstractQueueVisitor implements QuiescingExecutor { /** * A wrapped {@link Runnable} that: + * * <ul> * <li>Sets {@link #run} to {@code true} when {@code WrappedRunnable} is run, * <li>Records the thread evaluating {@code r} in {@link #jobs} while {@code r} is evaluated, * <li>Prevents {@link #originalRunnable} from being invoked if {@link #blockNewActions} returns - * {@code true}, + * {@code true}, * <li>Synchronously invokes {@code runnable.run()}, * <li>Catches any {@link Throwable} thrown by {@code runnable.run()}, and if it is the most - * severe {@link Throwable} seen by this {@link AbstractQueueVisitor}, assigns it to - * {@link #unhandled}, and sets {@link #jobsMustBeStopped} if necessary, + * severe {@link Throwable} seen by this {@link AbstractQueueVisitor}, assigns it to {@link + * #unhandled}, and sets {@link #jobsMustBeStopped} if necessary, * <li>And, lastly, calls {@link #decrementRemainingTasks}. * </ul> */ @@ -365,11 +368,11 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } private void addJob(Thread thread) { - jobs.incrementAndGet(thread); + jobs.computeIfAbsent(thread, k -> new AtomicLong()).incrementAndGet(); } private void removeJob(Thread thread) { - if (jobs.decrementAndGet(thread) == 0) { + if (jobs.get(thread).decrementAndGet() == 0) { jobs.remove(thread); } } @@ -483,7 +486,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { if (ownExecutorService) { executorService.shutdown(); - for (;;) { + for (; ; ) { try { Throwables.propagateIfPossible(catastrophe); executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); @@ -497,7 +500,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { private void interruptInFlightTasks() { Thread thisThread = Thread.currentThread(); - for (Thread thread : jobs.asMap().keySet()) { + for (Thread thread : jobs.keySet()) { if (thisThread != thread) { thread.interrupt(); } |