From e0a330577d9fe98169645cb68d9fc22cc787eeb6 Mon Sep 17 00:00:00 2001 From: Nathan Harmata Date: Fri, 2 Dec 2016 17:58:33 +0000 Subject: For all function expressions of the form f(..., e1, ..., e2, ..., eK, ...), ensure that all of e1, e2, ..., and eK are elligble for parallel evaluation. This is _not_ the same as providing a parallel implementation of f, which we can do separately in followup CLs. -- PiperOrigin-RevId: 140861694 MOS_MIGRATED_REVID=140861694 --- .../build/lib/query2/engine/QueryUtil.java | 128 ++++++++++++++++++++- 1 file changed, 124 insertions(+), 4 deletions(-) (limited to 'src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java') diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java index 2d7be2ed74..b8087c51c5 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java +++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java @@ -17,9 +17,11 @@ package com.google.devtools.build.lib.query2.engine; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.MapMaker; +import com.google.common.collect.Sets; import com.google.devtools.build.lib.collect.CompactHashSet; import java.util.Collections; import java.util.Set; +import javax.annotation.concurrent.ThreadSafe; /** Several query utilities to make easier to work with query callbacks and uniquifiers. */ public final class QueryUtil { @@ -31,6 +33,11 @@ public final class QueryUtil { Set getResult(); } + /** A {@link AggregateAllCallback} that is a {@link ThreadSafeCallback}. */ + public interface ThreadSafeAggregateAllCallback + extends AggregateAllCallback, ThreadSafeCallback { + } + /** A {@link OutputFormatterCallback} that can aggregate all the partial results into one set. */ public abstract static class AggregateAllOutputFormatterCallback extends OutputFormatterCallback implements AggregateAllCallback { @@ -38,6 +45,7 @@ public final class QueryUtil { private static class AggregateAllOutputFormatterCallbackImpl extends AggregateAllOutputFormatterCallback { + // We only add to the CompactHashSet, so iteration order is the same as insertion order. private final Set result = CompactHashSet.create(); @Override @@ -51,12 +59,27 @@ public final class QueryUtil { } } + private static class ThreadSafeAggregateAllOutputFormatterCallbackImpl + implements ThreadSafeAggregateAllCallback { + private final Set result = Sets.newConcurrentHashSet(); + + @Override + public final void process(Iterable partialResult) { + Iterables.addAll(result, partialResult); + } + + @Override + public Set getResult() { + return result; + } + } + /** * Returns a fresh {@link AggregateAllOutputFormatterCallback} that can aggregate all the partial * results into one set. * *

Intended to be used by top-level evaluation of {@link QueryExpression}s; contrast with - * {@link #newAggregateAllCallback}. + * {@link #newThreadSafeAggregateAllCallback}. */ public static AggregateAllOutputFormatterCallback newAggregateAllOutputFormatterCallback() { @@ -66,11 +89,21 @@ public final class QueryUtil { /** * Returns a fresh {@link AggregateAllCallback}. * + *

Intended to be used by {@link QueryEnvironment#eval} implementations that make use of + * neither of streaming evaluation nor parallel evaluation. + */ + public static AggregateAllCallback newOrderedAggregateAllCallback() { + return new AggregateAllOutputFormatterCallbackImpl<>(); + } + + /** + * Returns a fresh {@link ThreadSafeAggregateAllCallback}. + * *

Intended to be used by {@link QueryExpression} implementations; contrast with * {@link #newAggregateAllOutputFormatterCallback}. */ - public static AggregateAllCallback newAggregateAllCallback() { - return new AggregateAllOutputFormatterCallbackImpl<>(); + public static AggregateAllCallback newThreadSafeAggregateAllCallback() { + return new ThreadSafeAggregateAllOutputFormatterCallbackImpl<>(); } /** @@ -81,7 +114,7 @@ public final class QueryUtil { public static Set evalAll( QueryEnvironment env, VariableContext context, QueryExpression expr) throws QueryException, InterruptedException { - AggregateAllCallback callback = newAggregateAllCallback(); + AggregateAllCallback callback = newThreadSafeAggregateAllCallback(); env.eval(expr, context, callback); return callback.getResult(); } @@ -139,4 +172,91 @@ public final class QueryUtil { return result.build(); } } + + /** + * An abstraction for sending results to a {@link Callback} based on partial query results. + * + *

Intended as a convenience for {@link QueryFunction#eval}/{@link QueryFunction#parEval} + * implementations. + */ + @ThreadSafe + interface Processor { + /** + * Processes the given {@code partialResult} and then invokes the given {@code callback} with + * a further result. + */ + void process(Iterable partialResult, Callback callback) + throws QueryException, InterruptedException; + } + + /** + * Similar to {@link Processor}, except the {@link #process} function also gets access to a + * {@link Uniquifier}. + */ + @ThreadSafe + interface ProcessorWithUniquifier { + /** + * Processes the given {@code partialResult} using the given {@code uniquifier} and then invokes + * the given {@code callback} with a further result. + */ + void process(Iterable partialResult, Uniquifier uniquifier, Callback callback) + throws QueryException, InterruptedException; + } + + /** + * Returns a new {@link Callback} that composes {@code callback} with {@code processor}. That is, + * this method returns a {@code c} such that {@code c.process(r)} calls + * {@code callback.process(r, callback)}. + * + *

The returned {@link Callback} will be a {@link ThreadSafeCallback} if {@code callback} + * is as well. + */ + public static Callback compose( + final Processor processor, + final Callback callback) { + return callback instanceof ThreadSafeCallback + ? new ThreadSafeCallback() { + @Override + public void process(Iterable partialResult) + throws QueryException, InterruptedException { + processor.process(partialResult, callback); + } + } + : new Callback() { + @Override + public void process(Iterable partialResult) + throws QueryException, InterruptedException { + processor.process(partialResult, callback); + } + }; + } + + /** + * Returns a new {@link Callback} that composes {@code callback} with {@code processor} (applied + * to {@code uniquifier}). That is, this method returns a {@code c} such that {@code c.process(r)} + * calls {@code callback.process(r, uniquifier, c)}. + * + *

The returned {@link Callback} will be a {@link ThreadSafeCallback} if {@code callback} is a + * {@link ThreadSafeCallback} and {@code uniquifier} is a {@link ThreadSafeUniquifier}. + */ + public static Callback compose( + final ProcessorWithUniquifier processor, + final Uniquifier uniquifier, + final Callback callback) { + return (callback instanceof ThreadSafeCallback) && (uniquifier instanceof ThreadSafeUniquifier) + ? new ThreadSafeCallback() { + @Override + public void process(Iterable partialResult) + throws QueryException, InterruptedException { + processor.process(partialResult, uniquifier, callback); + } + } + : new Callback() { + @Override + public void process(Iterable partialResult) + throws QueryException, InterruptedException { + processor.process(partialResult, uniquifier, callback); + } + }; + } } -- cgit v1.2.3