aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/query2/engine
diff options
context:
space:
mode:
authorGravatar Nathan Harmata <nharmata@google.com>2016-12-12 17:15:04 +0000
committerGravatar John Cater <jcater@google.com>2016-12-12 20:36:01 +0000
commitb11dd482eef2eb922686fb9ba96e39113cc1abd1 (patch)
tree7c2886d876310a638f74c054f80c2fd4910f037e /src/main/java/com/google/devtools/build/lib/query2/engine
parent8e5f864a024bc21f332f997cc503c0ff2fff6926 (diff)
Fix bad bug with the parallel implementation of BinaryOperatorExpression. Turns out that ForkJoinTask#adapt(Callable) returns a ForkJoinTask whose Future#get on error throws a ExecutionException wrapping a RuntimeException wrapping the thrown checked exception from the callable. This is documented behavior [1] that I incorrectly didn't know about.
The additional level of wrapping meant that the catch-block of the parallel implementation of BinaryOperatorExpression wasn't rethrowing the InterruptedException/QueryException that the parallel task threw. The subtly in this bug is that the query expression being evaluated needs to be of the form "e1 + e2", where evaluation of "e1" throws a QueryException even in keepGoing mode (note that most of the query errors actually go through AbstractBlazeQueryEnvironment#reportBuildFileError). The test I wrote picks on LetExpression's evaluation-time (rather than e.g. parsing time) validation of the variable name. [1] https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ForkJoinTask.html#adapt(java.util.concurrent.Callable) -- PiperOrigin-RevId: 141772584 MOS_MIGRATED_REVID=141772584
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/engine')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java33
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java128
2 files changed, 137 insertions, 24 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
index d1efe80ff8..3f69b5b56b 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/BinaryOperatorExpression.java
@@ -13,20 +13,16 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;
-import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
-import com.google.devtools.build.lib.concurrent.MoreFutures;
import com.google.devtools.build.lib.query2.engine.Lexer.TokenKind;
+import com.google.devtools.build.lib.query2.engine.ParallelQueryUtils.QueryTask;
import com.google.devtools.build.lib.util.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.ForkJoinTask;
/**
* A binary algebraic set operation.
@@ -133,27 +129,16 @@ public class BinaryOperatorExpression extends QueryExpression {
final ThreadSafeCallback<T> callback,
ForkJoinPool forkJoinPool)
throws QueryException, InterruptedException {
- ArrayList<ForkJoinTask<Void>> tasks = new ArrayList<>(operands.size());
+ ArrayList<QueryTask> queryTasks = new ArrayList<>(operands.size());
for (final QueryExpression operand : operands) {
- tasks.add(ForkJoinTask.adapt(
- new Callable<Void>() {
- @Override
- public Void call() throws QueryException, InterruptedException {
- env.eval(operand, context, callback);
- return null;
- }
- }));
- }
- for (ForkJoinTask<?> task : tasks) {
- forkJoinPool.submit(task);
- }
- try {
- MoreFutures.waitForAllInterruptiblyFailFast(tasks);
- } catch (ExecutionException e) {
- Throwables.propagateIfPossible(
- e.getCause(), QueryException.class, InterruptedException.class);
- throw new IllegalStateException(e);
+ queryTasks.add(new QueryTask() {
+ @Override
+ public void execute() throws QueryException, InterruptedException {
+ env.eval(operand, context, callback);
+ }
+ });
}
+ ParallelQueryUtils.executeQueryTasksAndWaitInterruptibly(queryTasks, forkJoinPool);
}
/**
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java
new file mode 100644
index 0000000000..388dd62511
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java
@@ -0,0 +1,128 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.query2.engine;
+
+import com.google.devtools.build.lib.concurrent.MoreFutures;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+
+/** Several utilities to aid in writing {@link QueryExpression#parEval} implementations. */
+public class ParallelQueryUtils {
+ /**
+ * Encapsulation of a subtask of parallel evaluation of a {@link QueryExpression}. See
+ * {@link #executeQueryTasksAndWaitInterruptibly}.
+ */
+ public interface QueryTask {
+ void execute() throws QueryException, InterruptedException;
+ }
+
+ /**
+ * Executes the given {@link QueryTask}s using the given {@link ForkJoinPool} and interruptibly
+ * waits for their completion. Throws the first {@link QueryException} or
+ * {@link InterruptedException} encountered during parallel execution.
+ */
+ public static void executeQueryTasksAndWaitInterruptibly(
+ List<QueryTask> queryTasks,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+ ArrayList<QueryTaskForkJoinTask> forkJoinTasks = new ArrayList<>(queryTasks.size());
+ for (QueryTask queryTask : queryTasks) {
+ QueryTaskForkJoinTask forkJoinTask = adaptAsForkJoinTask(queryTask);
+ forkJoinTasks.add(forkJoinTask);
+ forkJoinPool.submit(forkJoinTask);
+ }
+ try {
+ MoreFutures.waitForAllInterruptiblyFailFast(forkJoinTasks);
+ } catch (ExecutionException e) {
+ throw rethrowCause(e);
+ }
+ }
+
+ private static QueryTaskForkJoinTask adaptAsForkJoinTask(QueryTask queryTask) {
+ return new QueryTaskForkJoinTask(queryTask);
+ }
+
+ private static RuntimeException rethrowCause(ExecutionException e)
+ throws QueryException, InterruptedException {
+ Throwable cause = e.getCause();
+ if (cause instanceof ParallelRuntimeException) {
+ ((ParallelRuntimeException) cause).rethrow();
+ }
+ throw new IllegalStateException(e);
+ }
+
+ // ForkJoinTask#adapt(Callable) wraps thrown checked exceptions as RuntimeExceptions. We avoid
+ // having to think about that messiness (which is inconsistent with other Future implementations)
+ // by having our own ForkJoinTask subclass and managing checked exceptions ourselves.
+ private static class QueryTaskForkJoinTask extends ForkJoinTask<Void> {
+ private final QueryTask queryTask;
+
+ private QueryTaskForkJoinTask(QueryTask queryTask) {
+ this.queryTask = queryTask;
+ }
+
+ @Override
+ public Void getRawResult() {
+ return null;
+ }
+
+ @Override
+ protected void setRawResult(Void value) {
+ }
+
+ @Override
+ protected boolean exec() {
+ try {
+ queryTask.execute();
+ } catch (QueryException queryException) {
+ throw new ParallelRuntimeQueryException(queryException);
+ } catch (InterruptedException interruptedException) {
+ throw new ParallelInterruptedQueryException(interruptedException);
+ }
+ return true;
+ }
+ }
+
+ private abstract static class ParallelRuntimeException extends RuntimeException {
+ abstract void rethrow() throws QueryException, InterruptedException;
+ }
+
+ private static class ParallelRuntimeQueryException extends ParallelRuntimeException {
+ private final QueryException queryException;
+
+ private ParallelRuntimeQueryException(QueryException queryException) {
+ this.queryException = queryException;
+ }
+
+ @Override
+ void rethrow() throws QueryException, InterruptedException {
+ throw queryException;
+ }
+ }
+
+ private static class ParallelInterruptedQueryException extends ParallelRuntimeException {
+ private final InterruptedException interruptedException;
+
+ private ParallelInterruptedQueryException(InterruptedException interruptedException) {
+ this.interruptedException = interruptedException;
+ }
+
+ @Override
+ void rethrow() throws QueryException, InterruptedException {
+ throw interruptedException;
+ }
+ }
+}