aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java/com/google/devtools/build/lib/query2
diff options
context:
space:
mode:
authorGravatar Miguel Alcon Pinto <malcon@google.com>2015-11-18 16:05:17 +0000
committerGravatar Damien Martin-Guillerez <dmarting@google.com>2015-11-19 09:58:46 +0000
commit47ea94897a06fac211609d62b92f0548e3fdfb4d (patch)
treea6810a96f89aa36d70cdbff9eecec23b233ea9df /src/main/java/com/google/devtools/build/lib/query2
parent1246eb37678137f18418a8f92afb2f3677123b87 (diff)
Modify SkyQueryEnvironment to work in stream mode. This streaming system is pretty simple: It aggregates up to 10k elements and then it notifies the parent.
-- MOS_MIGRATED_REVID=108144202
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/query2')
-rw-r--r--src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java41
1 files changed, 38 insertions, 3 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
index 21e999bc50..5cb7a326b3 100644
--- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
+++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java
@@ -52,7 +52,6 @@ import com.google.devtools.build.lib.query2.engine.QueryEvalResult;
import com.google.devtools.build.lib.query2.engine.QueryException;
import com.google.devtools.build.lib.query2.engine.QueryExpression;
import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractUniquifier;
-import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback;
import com.google.devtools.build.lib.query2.engine.Uniquifier;
import com.google.devtools.build.lib.skyframe.FileValue;
import com.google.devtools.build.lib.skyframe.GraphBackedRecursivePackageProvider;
@@ -74,6 +73,7 @@ import com.google.devtools.build.skyframe.WalkableGraph;
import com.google.devtools.build.skyframe.WalkableGraph.WalkableGraphFactory;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
@@ -305,9 +305,11 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
@Override
public void eval(QueryExpression expr, Callback<Target> callback)
throws QueryException, InterruptedException {
- AggregateAllCallback<Target> aggregator = new AggregateAllCallback<>();
+ // 10k is likely a good balance between using batch efficiently and not blowing up memory.
+ BatchStreamedCallback aggregator = new BatchStreamedCallback(callback, 10000,
+ createUniquifier());
expr.eval(this, aggregator);
- callback.process(aggregator.getResult());
+ aggregator.processLastPending();
}
@Override
@@ -691,4 +693,37 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
.add(new RBuildFilesFunction())
.build();
}
+
+ private static class BatchStreamedCallback implements Callback<Target> {
+
+ private final Callback<Target> callback;
+ private final Uniquifier<Target> uniquifier;
+ private List<Target> pending = new ArrayList<>();
+ private int batchThreshold;
+
+ private BatchStreamedCallback(Callback<Target> callback, int batchThreshold,
+ Uniquifier<Target> uniquifier) {
+ this.callback = callback;
+ this.batchThreshold = batchThreshold;
+ this.uniquifier = uniquifier;
+ }
+
+ @Override
+ public void process(Iterable<Target> partialResult)
+ throws QueryException, InterruptedException {
+ Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed");
+ pending.addAll(uniquifier.unique(partialResult));
+ if (pending.size() >= batchThreshold) {
+ callback.process(pending);
+ pending = new ArrayList<>();
+ }
+ }
+
+ private void processLastPending() throws QueryException, InterruptedException {
+ if (!pending.isEmpty()) {
+ callback.process(pending);
+ pending = null;
+ }
+ }
+ }
}