From b11dd482eef2eb922686fb9ba96e39113cc1abd1 Mon Sep 17 00:00:00 2001 From: Nathan Harmata Date: Mon, 12 Dec 2016 17:15:04 +0000 Subject: Fix bad bug with the parallel implementation of BinaryOperatorExpression. Turns out that ForkJoinTask#adapt(Callable) returns a ForkJoinTask whose Future#get on error throws a ExecutionException wrapping a RuntimeException wrapping the thrown checked exception from the callable. This is documented behavior [1] that I incorrectly didn't know about. The additional level of wrapping meant that the catch-block of the parallel implementation of BinaryOperatorExpression wasn't rethrowing the InterruptedException/QueryException that the parallel task threw. The subtly in this bug is that the query expression being evaluated needs to be of the form "e1 + e2", where evaluation of "e1" throws a QueryException even in keepGoing mode (note that most of the query errors actually go through AbstractBlazeQueryEnvironment#reportBuildFileError). The test I wrote picks on LetExpression's evaluation-time (rather than e.g. parsing time) validation of the variable name. [1] https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinTask.html#adapt(java.util.concurrent.Callable) -- PiperOrigin-RevId: 141772584 MOS_MIGRATED_REVID=141772584 --- .../query2/engine/BinaryOperatorExpression.java | 33 ++---- .../lib/query2/engine/ParallelQueryUtils.java | 128 +++++++++++++++++++++ 2 files changed, 137 insertions(+), 24 deletions(-) create mode 100644 src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java (limited to 'src/main/java/com/google') diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java index d1efe80ff8..3f69b5b56b 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java @@ -13,20 +13,16 @@ // limitations under the License. package com.google.devtools.build.lib.query2.engine; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; -import com.google.devtools.build.lib.concurrent.MoreFutures; import com.google.devtools.build.lib.query2.engine.Lexer.TokenKind; +import com.google.devtools.build.lib.query2.engine.ParallelQueryUtils.QueryTask; import com.google.devtools.build.lib.util.Preconditions; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.ForkJoinTask; /** * A binary algebraic set operation. @@ -133,27 +129,16 @@ public class BinaryOperatorExpression extends QueryExpression { final ThreadSafeCallback callback, ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { - ArrayList> tasks = new ArrayList<>(operands.size()); + ArrayList queryTasks = new ArrayList<>(operands.size()); for (final QueryExpression operand : operands) { - tasks.add(ForkJoinTask.adapt( - new Callable() { - @Override - public Void call() throws QueryException, InterruptedException { - env.eval(operand, context, callback); - return null; - } - })); - } - for (ForkJoinTask task : tasks) { - forkJoinPool.submit(task); - } - try { - MoreFutures.waitForAllInterruptiblyFailFast(tasks); - } catch (ExecutionException e) { - Throwables.propagateIfPossible( - e.getCause(), QueryException.class, InterruptedException.class); - throw new IllegalStateException(e); + queryTasks.add(new QueryTask() { + @Override + public void execute() throws QueryException, InterruptedException { + env.eval(operand, context, callback); + } + }); } + ParallelQueryUtils.executeQueryTasksAndWaitInterruptibly(queryTasks, forkJoinPool); } /** diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java new file mode 100644 index 0000000000..388dd62511 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java @@ -0,0 +1,128 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.query2.engine; + +import com.google.devtools.build.lib.concurrent.MoreFutures; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; + +/** Several utilities to aid in writing {@link QueryExpression#parEval} implementations. */ +public class ParallelQueryUtils { + /** + * Encapsulation of a subtask of parallel evaluation of a {@link QueryExpression}. See + * {@link #executeQueryTasksAndWaitInterruptibly}. + */ + public interface QueryTask { + void execute() throws QueryException, InterruptedException; + } + + /** + * Executes the given {@link QueryTask}s using the given {@link ForkJoinPool} and interruptibly + * waits for their completion. Throws the first {@link QueryException} or + * {@link InterruptedException} encountered during parallel execution. + */ + public static void executeQueryTasksAndWaitInterruptibly( + List queryTasks, + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { + ArrayList forkJoinTasks = new ArrayList<>(queryTasks.size()); + for (QueryTask queryTask : queryTasks) { + QueryTaskForkJoinTask forkJoinTask = adaptAsForkJoinTask(queryTask); + forkJoinTasks.add(forkJoinTask); + forkJoinPool.submit(forkJoinTask); + } + try { + MoreFutures.waitForAllInterruptiblyFailFast(forkJoinTasks); + } catch (ExecutionException e) { + throw rethrowCause(e); + } + } + + private static QueryTaskForkJoinTask adaptAsForkJoinTask(QueryTask queryTask) { + return new QueryTaskForkJoinTask(queryTask); + } + + private static RuntimeException rethrowCause(ExecutionException e) + throws QueryException, InterruptedException { + Throwable cause = e.getCause(); + if (cause instanceof ParallelRuntimeException) { + ((ParallelRuntimeException) cause).rethrow(); + } + throw new IllegalStateException(e); + } + + // ForkJoinTask#adapt(Callable) wraps thrown checked exceptions as RuntimeExceptions. We avoid + // having to think about that messiness (which is inconsistent with other Future implementations) + // by having our own ForkJoinTask subclass and managing checked exceptions ourselves. + private static class QueryTaskForkJoinTask extends ForkJoinTask { + private final QueryTask queryTask; + + private QueryTaskForkJoinTask(QueryTask queryTask) { + this.queryTask = queryTask; + } + + @Override + public Void getRawResult() { + return null; + } + + @Override + protected void setRawResult(Void value) { + } + + @Override + protected boolean exec() { + try { + queryTask.execute(); + } catch (QueryException queryException) { + throw new ParallelRuntimeQueryException(queryException); + } catch (InterruptedException interruptedException) { + throw new ParallelInterruptedQueryException(interruptedException); + } + return true; + } + } + + private abstract static class ParallelRuntimeException extends RuntimeException { + abstract void rethrow() throws QueryException, InterruptedException; + } + + private static class ParallelRuntimeQueryException extends ParallelRuntimeException { + private final QueryException queryException; + + private ParallelRuntimeQueryException(QueryException queryException) { + this.queryException = queryException; + } + + @Override + void rethrow() throws QueryException, InterruptedException { + throw queryException; + } + } + + private static class ParallelInterruptedQueryException extends ParallelRuntimeException { + private final InterruptedException interruptedException; + + private ParallelInterruptedQueryException(InterruptedException interruptedException) { + this.interruptedException = interruptedException; + } + + @Override + void rethrow() throws QueryException, InterruptedException { + throw interruptedException; + } + } +} -- cgit v1.2.3