diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java | 128 |
1 files changed, 128 insertions, 0 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java b/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java new file mode 100644 index 0000000000..388dd62511 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/ParallelQueryUtils.java @@ -0,0 +1,128 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.query2.engine; + +import com.google.devtools.build.lib.concurrent.MoreFutures; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; + +/** Several utilities to aid in writing {@link QueryExpression#parEval} implementations. */ +public class ParallelQueryUtils { + /** + * Encapsulation of a subtask of parallel evaluation of a {@link QueryExpression}. See + * {@link #executeQueryTasksAndWaitInterruptibly}. + */ + public interface QueryTask { + void execute() throws QueryException, InterruptedException; + } + + /** + * Executes the given {@link QueryTask}s using the given {@link ForkJoinPool} and interruptibly + * waits for their completion. Throws the first {@link QueryException} or + * {@link InterruptedException} encountered during parallel execution. + */ + public static void executeQueryTasksAndWaitInterruptibly( + List<QueryTask> queryTasks, + ForkJoinPool forkJoinPool) throws QueryException, InterruptedException { + ArrayList<QueryTaskForkJoinTask> forkJoinTasks = new ArrayList<>(queryTasks.size()); + for (QueryTask queryTask : queryTasks) { + QueryTaskForkJoinTask forkJoinTask = adaptAsForkJoinTask(queryTask); + forkJoinTasks.add(forkJoinTask); + forkJoinPool.submit(forkJoinTask); + } + try { + MoreFutures.waitForAllInterruptiblyFailFast(forkJoinTasks); + } catch (ExecutionException e) { + throw rethrowCause(e); + } + } + + private static QueryTaskForkJoinTask adaptAsForkJoinTask(QueryTask queryTask) { + return new QueryTaskForkJoinTask(queryTask); + } + + private static RuntimeException rethrowCause(ExecutionException e) + throws QueryException, InterruptedException { + Throwable cause = e.getCause(); + if (cause instanceof ParallelRuntimeException) { + ((ParallelRuntimeException) cause).rethrow(); + } + throw new IllegalStateException(e); + } + + // ForkJoinTask#adapt(Callable) wraps thrown checked exceptions as RuntimeExceptions. We avoid + // having to think about that messiness (which is inconsistent with other Future implementations) + // by having our own ForkJoinTask subclass and managing checked exceptions ourselves. + private static class QueryTaskForkJoinTask extends ForkJoinTask<Void> { + private final QueryTask queryTask; + + private QueryTaskForkJoinTask(QueryTask queryTask) { + this.queryTask = queryTask; + } + + @Override + public Void getRawResult() { + return null; + } + + @Override + protected void setRawResult(Void value) { + } + + @Override + protected boolean exec() { + try { + queryTask.execute(); + } catch (QueryException queryException) { + throw new ParallelRuntimeQueryException(queryException); + } catch (InterruptedException interruptedException) { + throw new ParallelInterruptedQueryException(interruptedException); + } + return true; + } + } + + private abstract static class ParallelRuntimeException extends RuntimeException { + abstract void rethrow() throws QueryException, InterruptedException; + } + + private static class ParallelRuntimeQueryException extends ParallelRuntimeException { + private final QueryException queryException; + + private ParallelRuntimeQueryException(QueryException queryException) { + this.queryException = queryException; + } + + @Override + void rethrow() throws QueryException, InterruptedException { + throw queryException; + } + } + + private static class ParallelInterruptedQueryException extends ParallelRuntimeException { + private final InterruptedException interruptedException; + + private ParallelInterruptedQueryException(InterruptedException interruptedException) { + this.interruptedException = interruptedException; + } + + @Override + void rethrow() throws QueryException, InterruptedException { + throw interruptedException; + } + } +} |