aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
diff options
context:
space:
mode:
authorGravatar Nathan Harmata <nharmata@google.com>2016-12-02 17:58:33 +0000
committerGravatar Kristina Chodorow <kchodorow@google.com>2016-12-02 19:09:07 +0000
commite0a330577d9fe98169645cb68d9fc22cc787eeb6 (patch)
tree9b1d3dc955baa99133511b9134715e9c73649a85 /src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
parenta110ac400190c90a45856f15482c8d0952c542f5 (diff)
For all function expressions of the form f(..., e1, ..., e2, ..., eK, ...), ensure that all of e1, e2, ..., and eK are elligble for parallel evaluation. This is _not_ the same as providing a parallel implementation of f, which we can do separately in followup CLs.
-- PiperOrigin-RevId: 140861694 MOS_MIGRATED_REVID=140861694
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java128
1 files changed, 124 insertions, 4 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
index 2d7be2ed74..b8087c51c5 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/engine/QueryUtil.java
@@ -17,9 +17,11 @@ package com.google.devtools.build.lib.query2.engine;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.MapMaker;
+import com.google.common.collect.Sets;
import com.google.devtools.build.lib.collect.CompactHashSet;
import java.util.Collections;
import java.util.Set;
+import javax.annotation.concurrent.ThreadSafe;
/** Several query utilities to make easier to work with query callbacks and uniquifiers. */
public final class QueryUtil {
@@ -31,6 +33,11 @@ public final class QueryUtil {
Set<T> getResult();
}
+ /** A {@link AggregateAllCallback} that is a {@link ThreadSafeCallback}. */
+ public interface ThreadSafeAggregateAllCallback<T>
+ extends AggregateAllCallback<T>, ThreadSafeCallback<T> {
+ }
+
/** A {@link OutputFormatterCallback} that can aggregate all the partial results into one set. */
public abstract static class AggregateAllOutputFormatterCallback<T>
extends OutputFormatterCallback<T> implements AggregateAllCallback<T> {
@@ -38,6 +45,7 @@ public final class QueryUtil {
private static class AggregateAllOutputFormatterCallbackImpl<T>
extends AggregateAllOutputFormatterCallback<T> {
+ // We only add to the CompactHashSet, so iteration order is the same as insertion order.
private final Set<T> result = CompactHashSet.create();
@Override
@@ -51,12 +59,27 @@ public final class QueryUtil {
}
}
+ private static class ThreadSafeAggregateAllOutputFormatterCallbackImpl<T>
+ implements ThreadSafeAggregateAllCallback<T> {
+ private final Set<T> result = Sets.newConcurrentHashSet();
+
+ @Override
+ public final void process(Iterable<T> partialResult) {
+ Iterables.addAll(result, partialResult);
+ }
+
+ @Override
+ public Set<T> getResult() {
+ return result;
+ }
+ }
+
/**
* Returns a fresh {@link AggregateAllOutputFormatterCallback} that can aggregate all the partial
* results into one set.
*
* <p>Intended to be used by top-level evaluation of {@link QueryExpression}s; contrast with
- * {@link #newAggregateAllCallback}.
+ * {@link #newThreadSafeAggregateAllCallback}.
*/
public static <T> AggregateAllOutputFormatterCallback<T>
newAggregateAllOutputFormatterCallback() {
@@ -66,11 +89,21 @@ public final class QueryUtil {
/**
* Returns a fresh {@link AggregateAllCallback}.
*
+ * <p>Intended to be used by {@link QueryEnvironment#eval} implementations that make use of
+ * neither of streaming evaluation nor parallel evaluation.
+ */
+ public static <T> AggregateAllCallback<T> newOrderedAggregateAllCallback() {
+ return new AggregateAllOutputFormatterCallbackImpl<>();
+ }
+
+ /**
+ * Returns a fresh {@link ThreadSafeAggregateAllCallback}.
+ *
* <p>Intended to be used by {@link QueryExpression} implementations; contrast with
* {@link #newAggregateAllOutputFormatterCallback}.
*/
- public static <T> AggregateAllCallback<T> newAggregateAllCallback() {
- return new AggregateAllOutputFormatterCallbackImpl<>();
+ public static <T> AggregateAllCallback<T> newThreadSafeAggregateAllCallback() {
+ return new ThreadSafeAggregateAllOutputFormatterCallbackImpl<>();
}
/**
@@ -81,7 +114,7 @@ public final class QueryUtil {
public static <T> Set<T> evalAll(
QueryEnvironment<T> env, VariableContext<T> context, QueryExpression expr)
throws QueryException, InterruptedException {
- AggregateAllCallback<T> callback = newAggregateAllCallback();
+ AggregateAllCallback<T> callback = newThreadSafeAggregateAllCallback();
env.eval(expr, context, callback);
return callback.getResult();
}
@@ -139,4 +172,91 @@ public final class QueryUtil {
return result.build();
}
}
+
+ /**
+ * An abstraction for sending results to a {@link Callback} based on partial query results.
+ *
+ * <p>Intended as a convenience for {@link QueryFunction#eval}/{@link QueryFunction#parEval}
+ * implementations.
+ */
+ @ThreadSafe
+ interface Processor<T> {
+ /**
+ * Processes the given {@code partialResult} and then invokes the given {@code callback} with
+ * a further result.
+ */
+ void process(Iterable<T> partialResult, Callback<T> callback)
+ throws QueryException, InterruptedException;
+ }
+
+ /**
+ * Similar to {@link Processor}, except the {@link #process} function also gets access to a
+ * {@link Uniquifier}.
+ */
+ @ThreadSafe
+ interface ProcessorWithUniquifier<T> {
+ /**
+ * Processes the given {@code partialResult} using the given {@code uniquifier} and then invokes
+ * the given {@code callback} with a further result.
+ */
+ void process(Iterable<T> partialResult, Uniquifier<T> uniquifier, Callback<T> callback)
+ throws QueryException, InterruptedException;
+ }
+
+ /**
+ * Returns a new {@link Callback} that composes {@code callback} with {@code processor}. That is,
+ * this method returns a {@code c} such that {@code c.process(r)} calls
+ * {@code callback.process(r, callback)}.
+ *
+ * <p>The returned {@link Callback} will be a {@link ThreadSafeCallback} if {@code callback}
+ * is as well.
+ */
+ public static <T> Callback<T> compose(
+ final Processor<T> processor,
+ final Callback<T> callback) {
+ return callback instanceof ThreadSafeCallback
+ ? new ThreadSafeCallback<T>() {
+ @Override
+ public void process(Iterable<T> partialResult)
+ throws QueryException, InterruptedException {
+ processor.process(partialResult, callback);
+ }
+ }
+ : new Callback<T>() {
+ @Override
+ public void process(Iterable<T> partialResult)
+ throws QueryException, InterruptedException {
+ processor.process(partialResult, callback);
+ }
+ };
+ }
+
+ /**
+ * Returns a new {@link Callback} that composes {@code callback} with {@code processor} (applied
+ * to {@code uniquifier}). That is, this method returns a {@code c} such that {@code c.process(r)}
+ * calls {@code callback.process(r, uniquifier, c)}.
+ *
+ * <p>The returned {@link Callback} will be a {@link ThreadSafeCallback} if {@code callback} is a
+ * {@link ThreadSafeCallback} and {@code uniquifier} is a {@link ThreadSafeUniquifier}.
+ */
+ public static <T> Callback<T> compose(
+ final ProcessorWithUniquifier<T> processor,
+ final Uniquifier<T> uniquifier,
+ final Callback<T> callback) {
+ return (callback instanceof ThreadSafeCallback) && (uniquifier instanceof ThreadSafeUniquifier)
+ ? new ThreadSafeCallback<T>() {
+ @Override
+ public void process(Iterable<T> partialResult)
+ throws QueryException, InterruptedException {
+ processor.process(partialResult, uniquifier, callback);
+ }
+ }
+ : new Callback<T>() {
+ @Override
+ public void process(Iterable<T> partialResult)
+ throws QueryException, InterruptedException {
+ processor.process(partialResult, uniquifier, callback);
+ }
+ };
+ }
}