diff options
author | 2016-12-02 17:58:33 +0000 | |
---|---|---|
committer | 2016-12-02 19:09:07 +0000 | |
commit | e0a330577d9fe98169645cb68d9fc22cc787eeb6 (patch) | |
tree | 9b1d3dc955baa99133511b9134715e9c73649a85 /src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java | |
parent | a110ac400190c90a45856f15482c8d0952c542f5 (diff) |
For all function expressions of the form f(..., e1, ..., e2, ..., eK, ...), ensure that all of e1, e2, ..., and eK are elligble for parallel evaluation. This is _not_ the same as providing a parallel implementation of f, which we can do separately in followup CLs.
--
PiperOrigin-RevId: 140861694
MOS_MIGRATED_REVID=140861694
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java | 128 |
1 files changed, 124 insertions, 4 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java index 2d7be2ed74..b8087c51c5 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java @@ -17,9 +17,11 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.MapMaker; +import com.google.common.collect.Sets; import com.google.devtools.build.lib.collect.CompactHashSet; import java.util.Collections; import java.util.Set; +import javax.annotation.concurrent.ThreadSafe; /** Several query utilities to make easier to work with query callbacks and uniquifiers. */ public final class QueryUtil { @@ -31,6 +33,11 @@ public final class QueryUtil { Set<T> getResult(); } + /** A {@link AggregateAllCallback} that is a {@link ThreadSafeCallback}. */ + public interface ThreadSafeAggregateAllCallback<T> + extends AggregateAllCallback<T>, ThreadSafeCallback<T> { + } + /** A {@link OutputFormatterCallback} that can aggregate all the partial results into one set. */ public abstract static class AggregateAllOutputFormatterCallback<T> extends OutputFormatterCallback<T> implements AggregateAllCallback<T> { @@ -38,6 +45,7 @@ public final class QueryUtil { private static class AggregateAllOutputFormatterCallbackImpl<T> extends AggregateAllOutputFormatterCallback<T> { + // We only add to the CompactHashSet, so iteration order is the same as insertion order. private final Set<T> result = CompactHashSet.create(); @Override @@ -51,12 +59,27 @@ public final class QueryUtil { } } + private static class ThreadSafeAggregateAllOutputFormatterCallbackImpl<T> + implements ThreadSafeAggregateAllCallback<T> { + private final Set<T> result = Sets.newConcurrentHashSet(); + + @Override + public final void process(Iterable<T> partialResult) { + Iterables.addAll(result, partialResult); + } + + @Override + public Set<T> getResult() { + return result; + } + } + /** * Returns a fresh {@link AggregateAllOutputFormatterCallback} that can aggregate all the partial * results into one set. * * <p>Intended to be used by top-level evaluation of {@link QueryExpression}s; contrast with - * {@link #newAggregateAllCallback}. + * {@link #newThreadSafeAggregateAllCallback}. */ public static <T> AggregateAllOutputFormatterCallback<T> newAggregateAllOutputFormatterCallback() { @@ -66,11 +89,21 @@ public final class QueryUtil { /** * Returns a fresh {@link AggregateAllCallback}. * + * <p>Intended to be used by {@link QueryEnvironment#eval} implementations that make use of + * neither of streaming evaluation nor parallel evaluation. + */ + public static <T> AggregateAllCallback<T> newOrderedAggregateAllCallback() { + return new AggregateAllOutputFormatterCallbackImpl<>(); + } + + /** + * Returns a fresh {@link ThreadSafeAggregateAllCallback}. + * * <p>Intended to be used by {@link QueryExpression} implementations; contrast with * {@link #newAggregateAllOutputFormatterCallback}. */ - public static <T> AggregateAllCallback<T> newAggregateAllCallback() { - return new AggregateAllOutputFormatterCallbackImpl<>(); + public static <T> AggregateAllCallback<T> newThreadSafeAggregateAllCallback() { + return new ThreadSafeAggregateAllOutputFormatterCallbackImpl<>(); } /** @@ -81,7 +114,7 @@ public final class QueryUtil { public static <T> Set<T> evalAll( QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expr) throws QueryException, InterruptedException { - AggregateAllCallback<T> callback = newAggregateAllCallback(); + AggregateAllCallback<T> callback = newThreadSafeAggregateAllCallback(); env.eval(expr, context, callback); return callback.getResult(); } @@ -139,4 +172,91 @@ public final class QueryUtil { return result.build(); } } + + /** + * An abstraction for sending results to a {@link Callback} based on partial query results. + * + * <p>Intended as a convenience for {@link QueryFunction#eval}/{@link QueryFunction#parEval} + * implementations. + */ + @ThreadSafe + interface Processor<T> { + /** + * Processes the given {@code partialResult} and then invokes the given {@code callback} with + * a further result. + */ + void process(Iterable<T> partialResult, Callback<T> callback) + throws QueryException, InterruptedException; + } + + /** + * Similar to {@link Processor}, except the {@link #process} function also gets access to a + * {@link Uniquifier}. + */ + @ThreadSafe + interface ProcessorWithUniquifier<T> { + /** + * Processes the given {@code partialResult} using the given {@code uniquifier} and then invokes + * the given {@code callback} with a further result. + */ + void process(Iterable<T> partialResult, Uniquifier<T> uniquifier, Callback<T> callback) + throws QueryException, InterruptedException; + } + + /** + * Returns a new {@link Callback} that composes {@code callback} with {@code processor}. That is, + * this method returns a {@code c} such that {@code c.process(r)} calls + * {@code callback.process(r, callback)}. + * + * <p>The returned {@link Callback} will be a {@link ThreadSafeCallback} if {@code callback} + * is as well. + */ + public static <T> Callback<T> compose( + final Processor<T> processor, + final Callback<T> callback) { + return callback instanceof ThreadSafeCallback + ? new ThreadSafeCallback<T>() { + @Override + public void process(Iterable<T> partialResult) + throws QueryException, InterruptedException { + processor.process(partialResult, callback); + } + } + : new Callback<T>() { + @Override + public void process(Iterable<T> partialResult) + throws QueryException, InterruptedException { + processor.process(partialResult, callback); + } + }; + } + + /** + * Returns a new {@link Callback} that composes {@code callback} with {@code processor} (applied + * to {@code uniquifier}). That is, this method returns a {@code c} such that {@code c.process(r)} + * calls {@code callback.process(r, uniquifier, c)}. + * + * <p>The returned {@link Callback} will be a {@link ThreadSafeCallback} if {@code callback} is a + * {@link ThreadSafeCallback} and {@code uniquifier} is a {@link ThreadSafeUniquifier}. + */ + public static <T> Callback<T> compose( + final ProcessorWithUniquifier<T> processor, + final Uniquifier<T> uniquifier, + final Callback<T> callback) { + return (callback instanceof ThreadSafeCallback) && (uniquifier instanceof ThreadSafeUniquifier) + ? new ThreadSafeCallback<T>() { + @Override + public void process(Iterable<T> partialResult) + throws QueryException, InterruptedException { + processor.process(partialResult, uniquifier, callback); + } + } + : new Callback<T>() { + @Override + public void process(Iterable<T> partialResult) + throws QueryException, InterruptedException { + processor.process(partialResult, uniquifier, callback); + } + }; + } } |