From 6ae05b419922193c4c253e51c9a5e483e4f947fa Mon Sep 17 00:00:00 2001 From: janakr Date: Mon, 13 Aug 2018 16:13:42 -0700 Subject: Order Skyframe evaluations in a priority queue, with all children of a given node having the same priority, later enqueueings having higher priority, re-enqueued nodes having highest priority, and new root nodes having lowest priority. Experimentally, this can save significant RAM (1.4G in some builds!) while not affecting speed. Also do a semi-drive-by deleting ExecutorFactory parameter to AbstractQueueVisitor, since it was always AbstractQueueVisitor.EXECUTOR_FACTORY. PiperOrigin-RevId: 208560889 --- .../build/lib/concurrent/AbstractQueueVisitor.java | 111 ++++++++++++++------- 1 file changed, 75 insertions(+), 36 deletions(-) (limited to 'src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java') diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java index c4e526ba97..1eb64979d2 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java @@ -14,15 +14,16 @@ package com.google.devtools.build.lib.concurrent; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.devtools.build.lib.concurrent.ErrorClassifier.ErrorClassification; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -32,27 +33,6 @@ import java.util.logging.Logger; /** A {@link QuiescingExecutor} implementation that wraps an {@link ExecutorService}. */ public class AbstractQueueVisitor implements QuiescingExecutor { - - /** - * Default factory function for constructing {@link ThreadPoolExecutor}s. The {@link - * ThreadPoolExecutor}s this creates have the same value for {@code corePoolSize} and {@code - * maximumPoolSize} because that results in a fixed-size thread pool, and the current use cases - * for {@link AbstractQueueVisitor} don't require any more sophisticated thread pool size - * management. - * - *

If client use cases change, they may invoke one of the {@link - * AbstractQueueVisitor#AbstractQueueVisitor} constructors that accepts a pre-constructed {@link - * ThreadPoolExecutor}. - */ - public static final Function EXECUTOR_FACTORY = - p -> - new ThreadPoolExecutor( - /*corePoolSize=*/ p.getParallelism(), - /*maximumPoolSize=*/ p.getParallelism(), - p.getKeepAliveTime(), - p.getUnits(), - p.getWorkQueue(), - new ThreadFactoryBuilder().setNameFormat(p.getPoolName() + " %d").build()); /** * The most severe unhandled exception thrown by a worker thread, according to {@link * #errorClassifier}. This exception gets propagated to the calling thread of {@link @@ -104,6 +84,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor { private final ExecutorService executorService; + private final boolean usingPriorityQueue; + /** * Flag used to record when the main thread (the thread which called {@link #awaitQuiescence}) is * interrupted. @@ -131,21 +113,55 @@ public class AbstractQueueVisitor implements QuiescingExecutor { private static final Logger logger = Logger.getLogger(AbstractQueueVisitor.class.getName()); + /** + * Default function for constructing {@link ThreadPoolExecutor}s. The {@link ThreadPoolExecutor}s + * this creates have the same value for {@code corePoolSize} and {@code maximumPoolSize} because + * that results in a fixed-size thread pool, and the current use cases for {@link + * AbstractQueueVisitor} don't require any more sophisticated thread pool size management. + * + *

If client use cases change, they may invoke one of the {@link + * AbstractQueueVisitor#AbstractQueueVisitor} constructors that accepts a pre-constructed {@link + * ThreadPoolExecutor}. + */ private static ExecutorService createExecutorService( int parallelism, long keepAliveTime, TimeUnit units, + BlockingQueue workQueue, + String poolName) { + return new ThreadPoolExecutor( + /*corePoolSize=*/ parallelism, + /*maximumPoolSize=*/ parallelism, + keepAliveTime, + units, + workQueue, + new ThreadFactoryBuilder() + .setNameFormat(Preconditions.checkNotNull(poolName) + " %d") + .build()); + } + + /** + * Creates an {@link AbstractQueueVisitor}, similar to {@link #AbstractQueueVisitor(int, long, + * TimeUnit, boolean, String, ErrorClassifier)}, but whose work is ordered by a {@link + * PriorityBlockingQueue}. The {@link Runnable} objects submitted to {@link #execute(Runnable)} + * must implement {@link Comparable}. + */ + public static AbstractQueueVisitor createWithPriorityQueue( + int parallelism, + long keepAliveTime, + TimeUnit units, + boolean failFastOnException, String poolName, - Function executorFactory) { - return Preconditions.checkNotNull(executorFactory) - .apply( - new ExecutorParams( - parallelism, - keepAliveTime, - units, - Preconditions.checkNotNull(poolName), - new BlockingStack())); + ErrorClassifier errorClassifier) { + return new AbstractQueueVisitor( + createExecutorService( + parallelism, keepAliveTime, units, new PriorityBlockingQueue<>(), poolName), + true, + failFastOnException, + errorClassifier, + /*usingPriorityQueue=*/ true); } + /** * Create the {@link AbstractQueueVisitor}. * @@ -157,7 +173,6 @@ public class AbstractQueueVisitor implements QuiescingExecutor { * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception. * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code * null}, default thread naming will be used. - * @param executorFactory the factory for constructing the executor service. * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs. */ public AbstractQueueVisitor( @@ -166,10 +181,9 @@ public class AbstractQueueVisitor implements QuiescingExecutor { TimeUnit units, boolean failFastOnException, String poolName, - Function executorFactory, ErrorClassifier errorClassifier) { this( - createExecutorService(parallelism, keepAliveTime, units, poolName, executorFactory), + createExecutorService(parallelism, keepAliveTime, units, new BlockingStack<>(), poolName), true, failFastOnException, errorClassifier); @@ -190,10 +204,25 @@ public class AbstractQueueVisitor implements QuiescingExecutor { boolean shutdownOnCompletion, boolean failFastOnException, ErrorClassifier errorClassifier) { + this( + executorService, + shutdownOnCompletion, + failFastOnException, + errorClassifier, + /*usingPriorityQueue=*/ false); + } + + private AbstractQueueVisitor( + ExecutorService executorService, + boolean shutdownOnCompletion, + boolean failFastOnException, + ErrorClassifier errorClassifier, + boolean usingPriorityQueue) { this.failFastOnException = failFastOnException; this.ownExecutorService = shutdownOnCompletion; this.executorService = Preconditions.checkNotNull(executorService); this.errorClassifier = Preconditions.checkNotNull(errorClassifier); + this.usingPriorityQueue = usingPriorityQueue; } @Override @@ -218,6 +247,9 @@ public class AbstractQueueVisitor implements QuiescingExecutor { /** Schedules a call. Called in a worker thread. */ @Override public final void execute(Runnable runnable) { + if (usingPriorityQueue) { + Preconditions.checkState(runnable instanceof Comparable); + } WrappedRunnable wrappedRunnable = new WrappedRunnable(runnable); try { // It's impossible for this increment to result in remainingTasks.get <= 0 because @@ -239,7 +271,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } } - protected void executeRunnable(Runnable runnable) { + protected void executeRunnable(WrappedRunnable runnable) { executorService.execute(runnable); } @@ -309,7 +341,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { *

  • And, lastly, calls {@link #decrementRemainingTasks}. * */ - private final class WrappedRunnable implements Runnable { + protected final class WrappedRunnable implements Runnable, Comparable { private final Runnable originalRunnable; private volatile boolean ran; @@ -345,6 +377,13 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } } } + + @SuppressWarnings("unchecked") + @Override + public int compareTo(WrappedRunnable o) { + // This should only be called when the concrete class is submitting comparable runnables. + return ((Comparable) originalRunnable).compareTo(o.originalRunnable); + } } private void addJob(Thread thread) { -- cgit v1.2.3