From 1aa49da5e5f9e98ce3a2a0b96f387f2990a2363f Mon Sep 17 00:00:00 2001 From: Mark Schaller Date: Fri, 30 Oct 2015 22:37:04 +0000 Subject: Introduce ErrorClassifier Changes the AbstractQueueVisitor strategy for varying its response to unhandled exceptions from inheritance to composition. This will help with a forthcoming switch from inheritance to delegation for ValueVisitor's use of AbstractQueueVisitor. -- MOS_MIGRATED_REVID=106730708 --- .../build/lib/concurrent/AbstractQueueVisitor.java | 126 +++++++++++++++------ .../build/lib/concurrent/ErrorClassifier.java | 59 ++++++++++ .../build/skyframe/InvalidatingNodeVisitor.java | 34 +++--- .../devtools/build/skyframe/ParallelEvaluator.java | 39 ++++--- 4 files changed, 193 insertions(+), 65 deletions(-) create mode 100644 src/main/java/com/google/devtools/build/lib/concurrent/ErrorClassifier.java (limited to 'src/main/java/com/google/devtools') diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java index 9f49798bde..d5b6f1a7e9 100644 --- a/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java +++ b/src/main/java/com/google/devtools/build/lib/concurrent/AbstractQueueVisitor.java @@ -148,6 +148,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor { */ private boolean jobsMustBeStopped = false; + private final ErrorClassifier errorClassifier; + private static final Logger LOG = Logger.getLogger(AbstractQueueVisitor.class.getName()); /** @@ -182,7 +184,46 @@ public class AbstractQueueVisitor implements QuiescingExecutor { failFastOnException, failFastOnInterrupt, poolName, - EXECUTOR_FACTORY); + EXECUTOR_FACTORY, + ErrorClassifier.DEFAULT); + } + + /** + * Create the AbstractQueueVisitor. + * + * @param concurrent true if concurrency should be enabled. Only set to + * false for debugging. + * @param parallelism a measure of parallelism for the {@link ExecutorService}, such as {@code + * parallelism} in {@link java.util.concurrent.ForkJoinPool}, or both {@code + * corePoolSize} and {@code maximumPoolSize} in {@link ThreadPoolExecutor}. + * @param keepAliveTime the keep-alive time for the thread pool. + * @param units the time units of keepAliveTime. + * @param failFastOnException if true, don't run new actions after + * an uncaught exception. + * @param failFastOnInterrupt if true, don't run new actions after interrupt. + * @param poolName sets the name of threads spawn by this thread pool. If {@code null}, default + * thread naming will be used. + * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs. + */ + public AbstractQueueVisitor( + boolean concurrent, + int parallelism, + long keepAliveTime, + TimeUnit units, + boolean failFastOnException, + boolean failFastOnInterrupt, + String poolName, + ErrorClassifier errorClassifier) { + this( + concurrent, + parallelism, + keepAliveTime, + units, + failFastOnException, + failFastOnInterrupt, + poolName, + EXECUTOR_FACTORY, + errorClassifier); } /** @@ -210,9 +251,11 @@ public class AbstractQueueVisitor implements QuiescingExecutor { boolean failFastOnException, boolean failFastOnInterrupt, String poolName, - Function executorFactory) { + Function executorFactory, + ErrorClassifier errorClassifier) { Preconditions.checkNotNull(poolName); Preconditions.checkNotNull(executorFactory); + Preconditions.checkNotNull(errorClassifier); this.concurrent = concurrent; this.failFastOnException = failFastOnException; this.failFastOnInterrupt = failFastOnInterrupt; @@ -223,6 +266,7 @@ public class AbstractQueueVisitor implements QuiescingExecutor { new ExecutorParams( parallelism, keepAliveTime, units, poolName, new BlockingStack())) : null; + this.errorClassifier = errorClassifier; } /** @@ -255,7 +299,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor { failFastOnException, true, poolName, - EXECUTOR_FACTORY); + EXECUTOR_FACTORY, + ErrorClassifier.DEFAULT); } /** @@ -280,7 +325,8 @@ public class AbstractQueueVisitor implements QuiescingExecutor { executor, shutdownOnCompletion, failFastOnException, - failFastOnInterrupt); + failFastOnInterrupt, + ErrorClassifier.DEFAULT); } /** @@ -307,6 +353,36 @@ public class AbstractQueueVisitor implements QuiescingExecutor { this.failFastOnInterrupt = failFastOnInterrupt; this.ownExecutorService = shutdownOnCompletion; this.pool = executor; + this.errorClassifier = ErrorClassifier.DEFAULT; + } + + /** + * Create the AbstractQueueVisitor. + * + * @param concurrent if false, run tasks inline instead of using the thread pool. + * @param executor The ThreadPool to use. + * @param shutdownOnCompletion If true, pass ownership of the Threadpool to + * this class. The pool will be shut down after a + * call to work(). Callers must not shut down the + * threadpool while queue visitors use it. + * @param failFastOnException if true, don't run new actions after + * an uncaught exception. + * @param failFastOnInterrupt if true, don't run new actions after interrupt. + * @param errorClassifier an error classifier used to determine whether to log and/or stop jobs. + */ + public AbstractQueueVisitor( + boolean concurrent, + ThreadPoolExecutor executor, + boolean shutdownOnCompletion, + boolean failFastOnException, + boolean failFastOnInterrupt, + ErrorClassifier errorClassifier) { + this.concurrent = concurrent; + this.failFastOnException = failFastOnException; + this.failFastOnInterrupt = failFastOnInterrupt; + this.ownExecutorService = shutdownOnCompletion; + this.pool = executor; + this.errorClassifier = errorClassifier; } /** @@ -321,7 +397,16 @@ public class AbstractQueueVisitor implements QuiescingExecutor { * thread naming will be used. */ public AbstractQueueVisitor(int parallelism, long keepAlive, TimeUnit units, String poolName) { - this(true, parallelism, keepAlive, units, false, true, poolName, EXECUTOR_FACTORY); + this( + true, + parallelism, + keepAlive, + units, + false, + true, + poolName, + EXECUTOR_FACTORY, + ErrorClassifier.DEFAULT); } @@ -612,42 +697,13 @@ public class AbstractQueueVisitor implements QuiescingExecutor { } } - /** Classification of an error thrown by an action. */ - protected enum ErrorClassification { - // All running actions should be stopped. - CRITICAL, - // Same as CRITICAL, but also log the error. - CRITICAL_AND_LOG, - // Other running actions should be left alone. - NOT_CRITICAL - } - - /** - * Classifies {@code e}. {@link Error}s are always classified as {@code CRITICAL_AND_LOG}. - * - *

Default value - always treat errors as {@code NOT_CRITICAL}. If different behavior is needed - * then we should override this method in subclasses. - * - * @param e the exception object to check - */ - protected ErrorClassification classifyError(Throwable e) { - return ErrorClassification.NOT_CRITICAL; - } - - private ErrorClassification classifyErrorInternal(Throwable e) { - if (e instanceof Error) { - return ErrorClassification.CRITICAL_AND_LOG; - } - return classifyError(e); - } - /** * If exception is critical then set a flag which signals * to stop all jobs inside {@link #awaitTermination(boolean)}. */ private synchronized void markToStopAllJobsIfNeeded(Throwable e) { boolean critical = false; - switch (classifyErrorInternal(e)) { + switch (errorClassifier.classify(e)) { case CRITICAL_AND_LOG: critical = true; LOG.log(Level.WARNING, "Found critical error in queue visitor", e); diff --git a/src/main/java/com/google/devtools/build/lib/concurrent/ErrorClassifier.java b/src/main/java/com/google/devtools/build/lib/concurrent/ErrorClassifier.java new file mode 100644 index 0000000000..250a7adcea --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/concurrent/ErrorClassifier.java @@ -0,0 +1,59 @@ +// Copyright 2015 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.concurrent; + +import com.google.common.base.Preconditions; + +/** A classifier for {@link Error}s and {@link Exception}s. */ +public abstract class ErrorClassifier { + + /** Classification of an error thrown by an action. */ + protected enum ErrorClassification { + /** All running actions should be stopped.*/ + CRITICAL, + /** Same as CRITICAL, but also log the error.*/ + CRITICAL_AND_LOG, + /** Other running actions should be left alone.*/ + NOT_CRITICAL + } + + /** Always treat exceptions as {@code NOT_CRITICAL}. */ + public static final ErrorClassifier DEFAULT = + new ErrorClassifier() { + @Override + protected ErrorClassification classifyException(Exception e) { + return ErrorClassification.NOT_CRITICAL; + } + }; + + /** + * Used by {@link #classify} to classify {@link Exception}s. (Note that {@link Error}s + * are always classified as {@code CRITICAL_AND_LOG}.) + * + * @param e the exception object to check + */ + protected abstract ErrorClassification classifyException(Exception e); + + /** + * Classify {@param e}. If {@code e} is an {@link Error}, it will be classified as {@code + * CRITICAL_AND_LOG}. Otherwise, calls {@link #classifyException}. + */ + public final ErrorClassification classify(Throwable e) { + if (e instanceof Error) { + return ErrorClassification.CRITICAL_AND_LOG; + } + Preconditions.checkArgument(e instanceof Exception, e); + return classifyException((Exception) e); + } +} diff --git a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java index 2d18f910fa..3c2c399bd5 100644 --- a/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java +++ b/src/main/java/com/google/devtools/build/skyframe/InvalidatingNodeVisitor.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.devtools.build.lib.concurrent.AbstractQueueVisitor; +import com.google.devtools.build.lib.concurrent.ErrorClassifier; import com.google.devtools.build.lib.concurrent.ExecutorParams; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.util.Pair; @@ -61,6 +62,16 @@ public abstract class InvalidatingNodeVisitor executorFactory) { - super(/*concurrent=*/true, - /*parallelism=*/DEFAULT_THREAD_COUNT, - /*keepAliveTime=*/1, - /*units=*/TimeUnit.SECONDS, - /*failFastOnException=*/true, - /*failFastOnInterrupt=*/true, + super( + /*concurrent=*/ true, + /*parallelism=*/ DEFAULT_THREAD_COUNT, + /*keepAliveTime=*/ 1, + /*units=*/ TimeUnit.SECONDS, + /*failFastOnException=*/ true, + /*failFastOnInterrupt=*/ true, "skyframe-invalidator", - executorFactory); + executorFactory, + errorClassifier); this.graph = Preconditions.checkNotNull(graph); this.invalidationReceiver = invalidationReceiver; this.dirtyKeyTracker = Preconditions.checkNotNull(dirtyKeyTracker); @@ -113,13 +126,6 @@ public abstract class InvalidatingNodeVisitor inflightNodes = Sets.newConcurrentHashSet(); private final Set crashes = Sets.newConcurrentHashSet(); private ValueVisitor(int threadCount) { - super(/*concurrent*/true, + super( + /*concurrent*/ true, threadCount, - 1, TimeUnit.SECONDS, - /*failFastOnException*/true, - /*failFastOnInterrupt*/true, - "skyframe-evaluator"); - } - - @Override - protected ErrorClassification classifyError(Throwable e) { - if (e instanceof SchedulerException) { - return ErrorClassification.CRITICAL; - } - if (e instanceof RuntimeException) { - return ErrorClassification.CRITICAL_AND_LOG; - } - return ErrorClassification.NOT_CRITICAL; + 1, + TimeUnit.SECONDS, + /*failFastOnException*/ true, + /*failFastOnInterrupt*/ true, + "skyframe-evaluator", + VALUE_VISITOR_ERROR_CLASSIFIER); } protected void waitForCompletion() throws InterruptedException { -- cgit v1.2.3