// Copyright 2014 The Bazel Authors. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.devtools.build.lib.concurrent; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.util.concurrent.AtomicLongMap; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.devtools.build.lib.concurrent.ErrorClassifier.ErrorClassification; import com.google.devtools.build.lib.util.Preconditions; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; /** A {@link QuiescingExecutor} implementation that wraps an {@link ExecutorService}. */ public class AbstractQueueVisitor implements QuiescingExecutor { /** * Default factory function for constructing {@link ThreadPoolExecutor}s. The {@link * ThreadPoolExecutor}s this creates have the same value for {@code corePoolSize} and {@code * maximumPoolSize} because that results in a fixed-size thread pool, and the current use cases * for {@link AbstractQueueVisitor} don't require any more sophisticated thread pool size * management. * *
If client use cases change, they may invoke one of the {@link
* AbstractQueueVisitor#AbstractQueueVisitor} constructors that accepts a pre-constructed {@link
* ThreadPoolExecutor}.
*/
public static final Function Field updates happen only in blocks that are synchronized on the {@link
* AbstractQueueVisitor} object.
*
* If {@link AbstractQueueVisitor} clients don't like the semantics of storing and propagating
* the most severe error, then they should be provide an {@link ErrorClassifier} that does the
* right thing (e.g. to cause the _first_ error to be propagated, you'd want to provide an
* {@link ErrorClassifier} that gives all errors the exact same {@link ErrorClassification}).
*
* Note that this is not a performance-critical path.
*/
private volatile Throwable unhandled = null;
/**
* An uncaught exception when submitting a job to the {@link ExecutorService} is catastrophic,
* and usually indicates a lack of stack space on which to allocate a native thread. The {@link
* ExecutorService} may reach an inconsistent state in such circumstances, so we avoid blocking
* on its termination when this field is non-{@code null}.
*/
private volatile Throwable catastrophe;
/**
* An object used in the manner of a {@link java.util.concurrent.locks.Condition} object, for the
* condition {@code remainingTasks.get() == 0 || jobsMustBeStopped}.
* TODO(bazel-team): Replace with an actual {@link java.util.concurrent.locks.Condition} object.
*/
private final Object zeroRemainingTasks = new Object();
/** The number of {@link Runnable}s {@link #execute}-d that have not finished evaluation. */
private final AtomicLong remainingTasks = new AtomicLong(0);
/**
* Flag used to record when all threads were killed by failed action execution. Only ever
* transitions from {@code false} to {@code true}.
*
* Except for {@link #mustJobsBeStopped}, may only be accessed in a block that is synchronized
* on {@link #zeroRemainingTasks}.
*/
private volatile boolean jobsMustBeStopped = false;
/** Map from thread to number of jobs executing in the thread. Used for interrupt handling. */
private final AtomicLongMap This function returns the CURRENT state of whether jobs should be stopped. If the value is
* false right now, it may be changed to true by another thread later.
*/
protected final boolean mustJobsBeStopped() {
return jobsMustBeStopped;
}
/**
* Waits for the task queue to drain. Then if {@code ownExecutorService} is true, shuts down the
* {@link ExecutorService} and waits for it to terminate. Throws (the same) unchecked exception if
* any worker thread failed unexpectedly.
*/
protected final void awaitTermination(boolean interruptWorkers) throws InterruptedException {
reallyAwaitTermination(interruptWorkers);
if (isInterrupted()) {
// Set interrupted bit on current thread so that callers can see that it was interrupted. Note
// that if the thread was interrupted while awaiting termination, we might not hit this
// code path, but then the current thread's interrupt bit is already set, so we are fine.
Thread.currentThread().interrupt();
}
// Throw the first unhandled (worker thread) exception in the main thread. We throw an unchecked
// exception instead of InterruptedException if both are present because an unchecked exception
// may indicate a catastrophic failure that should shut down the program. The caller can
// check the interrupted bit if they will handle the unchecked exception without crashing.
Throwables.propagateIfPossible(unhandled);
if (Thread.interrupted()) {
throw new InterruptedException();
}
}
private void reallyAwaitTermination(boolean interruptWorkers) {
// TODO(bazel-team): verify that interrupt() is safe for every use of
// AbstractQueueVisitor and remove the interruptWorkers flag.
if (interruptWorkers && !jobs.isEmpty()) {
interruptInFlightTasks();
}
if (isInterrupted()) {
interruptedLatch.countDown();
}
Throwables.propagateIfPossible(catastrophe);
synchronized (zeroRemainingTasks) {
while (remainingTasks.get() != 0) {
try {
zeroRemainingTasks.wait();
} catch (InterruptedException e) {
setInterrupted();
}
}
}
if (ownExecutorService) {
executorService.shutdown();
for (;;) {
try {
Throwables.propagateIfPossible(catastrophe);
executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
break;
} catch (InterruptedException e) {
setInterrupted();
}
}
}
}
private void interruptInFlightTasks() {
Thread thisThread = Thread.currentThread();
for (Thread thread : jobs.asMap().keySet()) {
if (thisThread != thread) {
thread.interrupt();
}
}
}
}
*
*/
private final class WrappedRunnable implements Runnable {
private final Runnable originalRunnable;
private volatile boolean ran;
private WrappedRunnable(Runnable originalRunnable) {
this.originalRunnable = originalRunnable;
}
@Override
public void run() {
ran = true;
Thread thread = null;
boolean addedJob = false;
try {
thread = Thread.currentThread();
addJob(thread);
addedJob = true;
if (blockNewActions()) {
// Make any newly enqueued tasks quickly die. We check after adding to the jobs map so
// that if another thread is racing to kill this thread and didn't make it before this
// conditional, it will be able to find and kill this thread anyway.
return;
}
originalRunnable.run();
} catch (Throwable e) {
maybeSaveUnhandledThrowable(e, /*markToStopJobs=*/ true);
} finally {
try {
if (thread != null && addedJob) {
removeJob(thread);
}
} finally {
decrementRemainingTasks();
}
}
}
}
private void addJob(Thread thread) {
jobs.incrementAndGet(thread);
}
private void removeJob(Thread thread) {
if (jobs.decrementAndGet(thread) == 0) {
jobs.remove(thread);
}
}
/** Set an internal flag to show that an interrupt was detected. */
protected final void setInterrupted() {
threadInterrupted = true;
}
private void decrementRemainingTasks() {
// This decrement statement may result in remainingTasks.get() == 0, so it must be checked
// and the zeroRemainingTasks condition object notified if that condition is obtained.
long tasks = remainingTasks.decrementAndGet();
Preconditions.checkState(
tasks >= 0, "Decrementing remaining tasks counter resulted in impossible negative number.");
if (tasks == 0) {
synchronized (zeroRemainingTasks) {
zeroRemainingTasks.notify();
}
}
}
/** If this returns true, don't enqueue new actions. */
protected boolean blockNewActions() {
return isInterrupted() || (unhandled != null && failFastOnException);
}
@VisibleForTesting
@Override
public final CountDownLatch getExceptionLatchForTestingOnly() {
return exceptionLatch;
}
@VisibleForTesting
@Override
public final CountDownLatch getInterruptionLatchForTestingOnly() {
return interruptedLatch;
}
/** Get the value of the interrupted flag. */
@ThreadSafety.ThreadSafe
protected final boolean isInterrupted() {
return threadInterrupted;
}
/**
* Get number of jobs remaining. Note that this can increase in value if running tasks submit
* further jobs.
*/
public final long getTaskCount() {
return remainingTasks.get();
}
/**
* Whether all running and pending jobs will be stopped or cancelled. Also newly submitted tasks
* will be rejected if this is true.
*
*