aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java128
1 files changed, 128 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java
new file mode 100644
index 0000000000..388dd62511
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java
@@ -0,0 +1,128 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.query2.engine;
+
+import com.google.devtools.build.lib.concurrent.MoreFutures;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
+
+/** Several utilities to aid in writing {@link QueryExpression#parEval} implementations. */
+public class ParallelQueryUtils {
+ /**
+ * Encapsulation of a subtask of parallel evaluation of a {@link QueryExpression}. See
+ * {@link #executeQueryTasksAndWaitInterruptibly}.
+ */
+ public interface QueryTask {
+ void execute() throws QueryException, InterruptedException;
+ }
+
+ /**
+ * Executes the given {@link QueryTask}s using the given {@link ForkJoinPool} and interruptibly
+ * waits for their completion. Throws the first {@link QueryException} or
+ * {@link InterruptedException} encountered during parallel execution.
+ */
+ public static void executeQueryTasksAndWaitInterruptibly(
+ List<QueryTask> queryTasks,
+ ForkJoinPool forkJoinPool) throws QueryException, InterruptedException {
+ ArrayList<QueryTaskForkJoinTask> forkJoinTasks = new ArrayList<>(queryTasks.size());
+ for (QueryTask queryTask : queryTasks) {
+ QueryTaskForkJoinTask forkJoinTask = adaptAsForkJoinTask(queryTask);
+ forkJoinTasks.add(forkJoinTask);
+ forkJoinPool.submit(forkJoinTask);
+ }
+ try {
+ MoreFutures.waitForAllInterruptiblyFailFast(forkJoinTasks);
+ } catch (ExecutionException e) {
+ throw rethrowCause(e);
+ }
+ }
+
+ private static QueryTaskForkJoinTask adaptAsForkJoinTask(QueryTask queryTask) {
+ return new QueryTaskForkJoinTask(queryTask);
+ }
+
+ private static RuntimeException rethrowCause(ExecutionException e)
+ throws QueryException, InterruptedException {
+ Throwable cause = e.getCause();
+ if (cause instanceof ParallelRuntimeException) {
+ ((ParallelRuntimeException) cause).rethrow();
+ }
+ throw new IllegalStateException(e);
+ }
+
+ // ForkJoinTask#adapt(Callable) wraps thrown checked exceptions as RuntimeExceptions. We avoid
+ // having to think about that messiness (which is inconsistent with other Future implementations)
+ // by having our own ForkJoinTask subclass and managing checked exceptions ourselves.
+ private static class QueryTaskForkJoinTask extends ForkJoinTask<Void> {
+ private final QueryTask queryTask;
+
+ private QueryTaskForkJoinTask(QueryTask queryTask) {
+ this.queryTask = queryTask;
+ }
+
+ @Override
+ public Void getRawResult() {
+ return null;
+ }
+
+ @Override
+ protected void setRawResult(Void value) {
+ }
+
+ @Override
+ protected boolean exec() {
+ try {
+ queryTask.execute();
+ } catch (QueryException queryException) {
+ throw new ParallelRuntimeQueryException(queryException);
+ } catch (InterruptedException interruptedException) {
+ throw new ParallelInterruptedQueryException(interruptedException);
+ }
+ return true;
+ }
+ }
+
+ private abstract static class ParallelRuntimeException extends RuntimeException {
+ abstract void rethrow() throws QueryException, InterruptedException;
+ }
+
+ private static class ParallelRuntimeQueryException extends ParallelRuntimeException {
+ private final QueryException queryException;
+
+ private ParallelRuntimeQueryException(QueryException queryException) {
+ this.queryException = queryException;
+ }
+
+ @Override
+ void rethrow() throws QueryException, InterruptedException {
+ throw queryException;
+ }
+ }
+
+ private static class ParallelInterruptedQueryException extends ParallelRuntimeException {
+ private final InterruptedException interruptedException;
+
+ private ParallelInterruptedQueryException(InterruptedException interruptedException) {
+ this.interruptedException = interruptedException;
+ }
+
+ @Override
+ void rethrow() throws QueryException, InterruptedException {
+ throw interruptedException;
+ }
+ }
+}