diff options
author | 2016-05-26 19:56:24 +0000 | |
---|---|---|
committer | 2016-05-27 08:45:27 +0000 | |
commit | 3724d92fb85e021b6b65c3edb77c61572baedc25 (patch) | |
tree | 7f6c2f47b84e01d603d73c8d46c0265ade4be65e /src | |
parent | a4bfb3c49f164d47322b39c560a716b89788002b (diff) |
Allow AQV users to inject arbitrary handling of classified errors.
--
MOS_MIGRATED_REVID=123347295
Diffstat (limited to 'src')
8 files changed, 139 insertions, 33 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java index 22a16e1c27..8c9fc30d65 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java @@ -18,6 +18,7 @@ import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.util.concurrent.AtomicLongMap; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.devtools.build.lib.concurrent.ErrorClassifier.ErrorClassification; import com.google.devtools.build.lib.util.Preconditions; import java.util.concurrent.CountDownLatch; @@ -136,6 +137,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { private final boolean ownExecutorService; private final ErrorClassifier errorClassifier; + private final ErrorHandler errorHandler; private static final Logger LOG = Logger.getLogger(AbstractQueueVisitor.class.getName()); @@ -171,7 +173,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor { failFastOnInterrupt, poolName, EXECUTOR_FACTORY, - ErrorClassifier.DEFAULT); + ErrorClassifier.DEFAULT, + ErrorHandler.NullHandler.INSTANCE); } /** @@ -189,6 +192,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { * @param poolName sets the name of threads spawned by the {@link ExecutorService}. If {@code * null}, default thread naming will be used. * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs. + * @param errorHandler a handler for classified errors. */ public AbstractQueueVisitor( boolean concurrent, @@ -198,7 +202,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor { boolean failFastOnException, boolean failFastOnInterrupt, String poolName, - ErrorClassifier errorClassifier) { + ErrorClassifier errorClassifier, + ErrorHandler errorHandler) { this( concurrent, parallelism, @@ -208,7 +213,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor { failFastOnInterrupt, poolName, EXECUTOR_FACTORY, - errorClassifier); + errorClassifier, + errorHandler); } /** @@ -227,6 +233,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor { * null}, default thread naming will be used. * @param executorFactory the factory for constructing the executor service if {@code concurrent} * is {@code true}. + * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs. + * @param errorHandler a handler for classified errors. */ public AbstractQueueVisitor( boolean concurrent, @@ -237,7 +245,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor { boolean failFastOnInterrupt, String poolName, Function<ExecutorParams, ? extends ExecutorService> executorFactory, - ErrorClassifier errorClassifier) { + ErrorClassifier errorClassifier, + ErrorHandler errorHandler) { Preconditions.checkNotNull(poolName); Preconditions.checkNotNull(executorFactory); Preconditions.checkNotNull(errorClassifier); @@ -252,6 +261,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { parallelism, keepAliveTime, units, poolName, new BlockingStack<Runnable>())) : null; this.errorClassifier = errorClassifier; + this.errorHandler = errorHandler; } /** @@ -284,7 +294,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor { true, poolName, EXECUTOR_FACTORY, - ErrorClassifier.DEFAULT); + ErrorClassifier.DEFAULT, + ErrorHandler.NullHandler.INSTANCE); } /** @@ -309,7 +320,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor { shutdownOnCompletion, failFastOnException, failFastOnInterrupt, - ErrorClassifier.DEFAULT); + ErrorClassifier.DEFAULT, + ErrorHandler.NullHandler.INSTANCE); } /** @@ -338,6 +350,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { this.ownExecutorService = shutdownOnCompletion; this.executorService = executorService; this.errorClassifier = ErrorClassifier.DEFAULT; + this.errorHandler = ErrorHandler.NullHandler.INSTANCE; } /** @@ -353,6 +366,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { * @param failFastOnException if {@code true}, don't run new actions after an uncaught exception. * @param failFastOnInterrupt if {@code true}, don't run new actions after an interrupt. * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs. + * @param errorHandler a handler for classified errors. */ public AbstractQueueVisitor( boolean concurrent, @@ -360,7 +374,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor { boolean shutdownOnCompletion, boolean failFastOnException, boolean failFastOnInterrupt, - ErrorClassifier errorClassifier) { + ErrorClassifier errorClassifier, + ErrorHandler errorHandler) { Preconditions.checkArgument(executorService != null || !concurrent); this.concurrent = concurrent; this.failFastOnException = failFastOnException; @@ -368,6 +383,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { this.ownExecutorService = shutdownOnCompletion; this.executorService = executorService; this.errorClassifier = errorClassifier; + this.errorHandler = errorHandler; } /** @@ -391,7 +407,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor { true, poolName, EXECUTOR_FACTORY, - ErrorClassifier.DEFAULT); + ErrorClassifier.DEFAULT, + ErrorHandler.NullHandler.INSTANCE); } @@ -686,7 +703,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor { */ private void markToStopAllJobsIfNeeded(Throwable e) { boolean critical = false; - switch (errorClassifier.classify(e)) { + ErrorClassification errorClassification = errorClassifier.classify(e); + switch (errorClassification) { case CRITICAL_AND_LOG: critical = true; LOG.log(Level.WARNING, "Found critical error in queue visitor", e); @@ -697,6 +715,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { default: break; } + errorHandler.handle(e, errorClassification); synchronized (zeroRemainingTasks) { if (critical && !jobsMustBeStopped) { jobsMustBeStopped = true; diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ErrorClassifier.java b/src/main/java/com/google/devtools/build/lib/concurrent/ErrorClassifier.java index a50e2bef81..5a304198a9 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/ErrorClassifier.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/ErrorClassifier.java @@ -15,11 +15,11 @@ package com.google.devtools.build.lib.concurrent; import com.google.devtools.build.lib.util.Preconditions; -/** A classifier for {@link Error}s and {@link Exception}s. */ +/** A classifier for {@link Error}s and {@link Exception}s. Used by {@link AbstractQueueVisitor}. */ public abstract class ErrorClassifier { /** Classification of an error thrown by an action. */ - protected enum ErrorClassification { + public enum ErrorClassification { /** All running actions should be stopped.*/ CRITICAL, /** Same as CRITICAL, but also log the error.*/ diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ErrorHandler.java b/src/main/java/com/google/devtools/build/lib/concurrent/ErrorHandler.java new file mode 100644 index 0000000000..923ec1f83f --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/concurrent/ErrorHandler.java @@ -0,0 +1,42 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.concurrent; + +import com.google.devtools.build.lib.concurrent.ErrorClassifier.ErrorClassification; + +/** A way to inject custom handling of errors encountered by {@link AbstractQueueVisitor}. */ +public interface ErrorHandler { + + /** + * Called by {@link AbstractQueueVisitor} right after using {@link ErrorClassifier} to classify + * the error, but right before actually acting on the classification. + * + * <p>Note that {@link Error}s are always classified as + * {@link ErrorClassification#CRITICAL_AND_LOG}. + */ + void handle(Throwable t, ErrorClassification classification); + + /** An {@link ErrorHandler} that does nothing. */ + class NullHandler implements ErrorHandler { + public static final NullHandler INSTANCE = new NullHandler(); + + private NullHandler() { + } + + @Override + public void handle(Throwable t, ErrorClassification classification) { + } + } +} + diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java index 34c2c42245..5aed120470 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/ForkJoinQuiescingExecutor.java @@ -23,14 +23,16 @@ import java.util.concurrent.ForkJoinTask; // maintaining AQV.remainingTasks. public class ForkJoinQuiescingExecutor extends AbstractQueueVisitor { - public ForkJoinQuiescingExecutor(ForkJoinPool forkJoinPool, ErrorClassifier errorClassifier) { + public ForkJoinQuiescingExecutor(ForkJoinPool forkJoinPool, ErrorClassifier errorClassifier, + ErrorHandler errorHandler) { super( /*concurrent=*/ true, forkJoinPool, /*shutdownOnCompletion=*/ true, /*failFastOnException=*/ true, /*failFastOnInterrupt=*/ true, - errorClassifier); + errorClassifier, + errorHandler); } @Override diff --git a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java index 49bcfa0165..bb6d2b2685 100644 --- a/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java +++ b/src/main/java/com/google/devtools/build/skyframe/EagerInvalidator.java @@ -15,6 +15,7 @@ package com.google.devtools.build.skyframe; import com.google.common.base.Function; import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; +import com.google.devtools.build.lib.concurrent.ErrorHandler; import com.google.devtools.build.lib.concurrent.ExecutorParams; import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DeletingNodeVisitor; import com.google.devtools.build.skyframe.InvalidatingNodeVisitor.DirtyingNodeVisitor; @@ -89,7 +90,8 @@ public final class EagerInvalidator { InvalidationState state, DirtyKeyTracker dirtyKeyTracker, ForkJoinPool forkJoinPool, - boolean supportInterruptions) { + boolean supportInterruptions, + ErrorHandler errorHandler) { state.update(diff); return state.isEmpty() ? null @@ -99,7 +101,8 @@ public final class EagerInvalidator { state, dirtyKeyTracker, forkJoinPool, - supportInterruptions); + supportInterruptions, + errorHandler); } /** @@ -133,7 +136,8 @@ public final class EagerInvalidator { InvalidationState state, DirtyKeyTracker dirtyKeyTracker, ForkJoinPool forkJoinPool, - boolean supportInterruptions) + boolean supportInterruptions, + ErrorHandler errorHandler) throws InterruptedException { DirtyingNodeVisitor visitor = createInvalidatingVisitorIfNeeded( @@ -143,7 +147,8 @@ public final class EagerInvalidator { state, dirtyKeyTracker, forkJoinPool, - supportInterruptions); + supportInterruptions, + errorHandler); if (visitor != null) { visitor.run(); } diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java index 78421cb3e7..bb7f05dd69 100644 --- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java +++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java @@ -22,6 +22,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; import com.google.devtools.build.lib.concurrent.ErrorClassifier; +import com.google.devtools.build.lib.concurrent.ErrorHandler; import com.google.devtools.build.lib.concurrent.ExecutorParams; import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor; import com.google.devtools.build.lib.concurrent.QuiescingExecutor; @@ -112,7 +113,8 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr /*failFastOnInterrupt=*/ true, "skyframe-invalidator", executorFactory, - errorClassifier); + errorClassifier, + ErrorHandler.NullHandler.INSTANCE); this.graph = Preconditions.checkNotNull(graph); this.invalidationReceiver = invalidationReceiver; this.dirtyKeyTracker = Preconditions.checkNotNull(dirtyKeyTracker); @@ -124,8 +126,9 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr @Nullable EvaluationProgressReceiver invalidationReceiver, InvalidationState state, DirtyKeyTracker dirtyKeyTracker, - ForkJoinPool forkJoinPool) { - this.executor = new ForkJoinQuiescingExecutor(forkJoinPool, errorClassifier); + ForkJoinPool forkJoinPool, + ErrorHandler errorHandler) { + this.executor = new ForkJoinQuiescingExecutor(forkJoinPool, errorClassifier, errorHandler); this.graph = Preconditions.checkNotNull(graph); this.invalidationReceiver = invalidationReceiver; this.dirtyKeyTracker = Preconditions.checkNotNull(dirtyKeyTracker); @@ -368,8 +371,9 @@ public abstract class InvalidatingNodeVisitor<TGraph extends ThinNodeQueryableGr InvalidationState state, DirtyKeyTracker dirtyKeyTracker, ForkJoinPool forkJoinPool, - boolean supportInterruptions) { - super(graph, invalidationReceiver, state, dirtyKeyTracker, forkJoinPool); + boolean supportInterruptions, + ErrorHandler errorHandler) { + super(graph, invalidationReceiver, state, dirtyKeyTracker, forkJoinPool, errorHandler); this.supportInterruptions = supportInterruptions; } diff --git a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java index b88ef42b61..d9307c6132 100644 --- a/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java +++ b/src/main/java/com/google/devtools/build/skyframe/ParallelEvaluator.java @@ -30,6 +30,7 @@ import com.google.devtools.build.lib.collect.nestedset.NestedSetBuilder; import com.google.devtools.build.lib.collect.nestedset.NestedSetVisitor; import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; import com.google.devtools.build.lib.concurrent.ErrorClassifier; +import com.google.devtools.build.lib.concurrent.ErrorHandler; import com.google.devtools.build.lib.concurrent.ForkJoinQuiescingExecutor; import com.google.devtools.build.lib.concurrent.QuiescingExecutor; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible; @@ -147,6 +148,7 @@ public final class ParallelEvaluator implements Evaluator { private final DirtyKeyTracker dirtyKeyTracker; private final Receiver<Collection<SkyKey>> inflightKeysReceiver; private final EventFilter storedEventFilter; + private final ErrorHandler errorHandler; public ParallelEvaluator( ProcessableGraph graph, @@ -174,6 +176,7 @@ public final class ParallelEvaluator implements Evaluator { new NestedSetVisitor<>(new NestedSetEventReceiver(reporter), emittedEventState); this.storedEventFilter = storedEventFilter; this.forkJoinPool = null; + this.errorHandler = ErrorHandler.NullHandler.INSTANCE; } public ParallelEvaluator( @@ -188,7 +191,8 @@ public final class ParallelEvaluator implements Evaluator { @Nullable EvaluationProgressReceiver progressReceiver, DirtyKeyTracker dirtyKeyTracker, Receiver<Collection<SkyKey>> inflightKeysReceiver, - ForkJoinPool forkJoinPool) { + ForkJoinPool forkJoinPool, + ErrorHandler errorHandler) { this.graph = graph; this.skyFunctions = skyFunctions; this.graphVersion = graphVersion; @@ -204,6 +208,7 @@ public final class ParallelEvaluator implements Evaluator { new NestedSetVisitor<>(new NestedSetEventReceiver(reporter), emittedEventState); this.storedEventFilter = storedEventFilter; this.forkJoinPool = Preconditions.checkNotNull(forkJoinPool); + this.errorHandler = errorHandler; } /** @@ -700,7 +705,7 @@ public final class ParallelEvaluator implements Evaluator { private ValueVisitor(ForkJoinPool forkJoinPool) { quiescingExecutor = - new ForkJoinQuiescingExecutor(forkJoinPool, VALUE_VISITOR_ERROR_CLASSIFIER); + new ForkJoinQuiescingExecutor(forkJoinPool, VALUE_VISITOR_ERROR_CLASSIFIER, errorHandler); } private ValueVisitor(int threadCount) { @@ -713,7 +718,8 @@ public final class ParallelEvaluator implements Evaluator { /*failFastOnException*/ true, /*failFastOnInterrupt*/ true, "skyframe-evaluator", - VALUE_VISITOR_ERROR_CLASSIFIER); + VALUE_VISITOR_ERROR_CLASSIFIER, + errorHandler); } private void waitForCompletion() throws InterruptedException { diff --git a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java index 64fc9f3c7c..7c751fce36 100644 --- a/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java +++ b/src/test/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitorTest.java @@ -347,8 +347,20 @@ public class AbstractQueueVisitorTest { ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); - final AbstractQueueVisitor visitor = - createQueueVisitorWithErrorClassification(executor, ErrorClassification.CRITICAL); + final AtomicBoolean throwableSeen = new AtomicBoolean(false); + ErrorHandler errorHandler = new ErrorHandler() { + @Override + public void handle(Throwable t, ErrorClassification classification) { + if (t == THROWABLE) { + assertThat(classification).isEqualTo(ErrorClassification.CRITICAL); + throwableSeen.compareAndSet(false, true); + } else { + fail(); + } + } + }; + final AbstractQueueVisitor visitor = createQueueVisitorWithConstantErrorClassification( + executor, ErrorClassification.CRITICAL, errorHandler); final CountDownLatch latch1 = new CountDownLatch(1); final AtomicBoolean wasInterrupted = new AtomicBoolean(false); @@ -381,18 +393,31 @@ public class AbstractQueueVisitorTest { assertTrue(wasInterrupted.get()); assertTrue(executor.isShutdown()); + assertTrue(throwableSeen.get()); } @Test public void javaErrorConsideredCriticalNoMatterWhat() throws Exception { ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); - AbstractQueueVisitor visitor = - createQueueVisitorWithErrorClassification(executor, ErrorClassification.NOT_CRITICAL); + final Error error = new Error("bad!"); + final AtomicBoolean criticalErrorSeen = new AtomicBoolean(false); + ErrorHandler errorHandler = new ErrorHandler() { + @Override + public void handle(Throwable t, ErrorClassification classification) { + if (t == error) { + assertThat(classification).isEqualTo(ErrorClassification.CRITICAL_AND_LOG); + criticalErrorSeen.compareAndSet(false, true); + } else { + fail(); + } + } + }; + AbstractQueueVisitor visitor = createQueueVisitorWithConstantErrorClassification( + executor, ErrorClassification.NOT_CRITICAL, errorHandler); final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean sleepFinished = new AtomicBoolean(false); final AtomicBoolean sleepInterrupted = new AtomicBoolean(false); - final Error error = new Error("bad!"); Runnable errorRunnable = new Runnable() { @Override public void run() { @@ -429,6 +454,7 @@ public class AbstractQueueVisitorTest { assertTrue(sleepInterrupted.get()); assertFalse(sleepFinished.get()); assertEquals(error, thrownError); + assertTrue(criticalErrorSeen.get()); } private Runnable throwingRunnable() { @@ -536,8 +562,9 @@ public class AbstractQueueVisitorTest { } } - private static AbstractQueueVisitor createQueueVisitorWithErrorClassification( - ThreadPoolExecutor executor, final ErrorClassification classification) { + private static AbstractQueueVisitor createQueueVisitorWithConstantErrorClassification( + ThreadPoolExecutor executor, final ErrorClassification classification, + ErrorHandler errorHandler) { return new AbstractQueueVisitor( /*concurrent=*/ true, executor, @@ -549,6 +576,7 @@ public class AbstractQueueVisitorTest { protected ErrorClassification classifyException(Exception e) { return classification; } - }); + }, + errorHandler); } } |