diff options
author | Miguel Alcon Pinto <malcon@google.com> | 2015-11-18 16:05:17 +0000 |
---|---|---|
committer | Damien Martin-Guillerez <dmarting@google.com> | 2015-11-19 09:58:46 +0000 |
commit | 47ea94897a06fac211609d62b92f0548e3fdfb4d (patch) | |
tree | a6810a96f89aa36d70cdbff9eecec23b233ea9df /src | |
parent | 1246eb37678137f18418a8f92afb2f3677123b87 (diff) |
Modify SkyQueryEnvironment to work in stream mode. This streaming system is pretty simple: It aggregates up to 10k elements and then it notifies the parent.
--
MOS_MIGRATED_REVID=108144202
Diffstat (limited to 'src')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java | 41 |
1 files changed, 38 insertions, 3 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java index 21e999bc50..5cb7a326b3 100644 --- a/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java +++ b/src/main/java/com/google/devtools/build/lib/query2/SkyQueryEnvironment.java @@ -52,7 +52,6 @@ import com.google.devtools.build.lib.query2.engine.QueryEvalResult; import com.google.devtools.build.lib.query2.engine.QueryException; import com.google.devtools.build.lib.query2.engine.QueryExpression; import com.google.devtools.build.lib.query2.engine.QueryUtil.AbstractUniquifier; -import com.google.devtools.build.lib.query2.engine.QueryUtil.AggregateAllCallback; import com.google.devtools.build.lib.query2.engine.Uniquifier; import com.google.devtools.build.lib.skyframe.FileValue; import com.google.devtools.build.lib.skyframe.GraphBackedRecursivePackageProvider; @@ -74,6 +73,7 @@ import com.google.devtools.build.skyframe.WalkableGraph; import com.google.devtools.build.skyframe.WalkableGraph.WalkableGraphFactory; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Collection; import java.util.Deque; import java.util.HashMap; @@ -305,9 +305,11 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> { @Override public void eval(QueryExpression expr, Callback<Target> callback) throws QueryException, InterruptedException { - AggregateAllCallback<Target> aggregator = new AggregateAllCallback<>(); + // 10k is likely a good balance between using batch efficiently and not blowing up memory. + BatchStreamedCallback aggregator = new BatchStreamedCallback(callback, 10000, + createUniquifier()); expr.eval(this, aggregator); - callback.process(aggregator.getResult()); + aggregator.processLastPending(); } @Override @@ -691,4 +693,37 @@ public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> { .add(new RBuildFilesFunction()) .build(); } + + private static class BatchStreamedCallback implements Callback<Target> { + + private final Callback<Target> callback; + private final Uniquifier<Target> uniquifier; + private List<Target> pending = new ArrayList<>(); + private int batchThreshold; + + private BatchStreamedCallback(Callback<Target> callback, int batchThreshold, + Uniquifier<Target> uniquifier) { + this.callback = callback; + this.batchThreshold = batchThreshold; + this.uniquifier = uniquifier; + } + + @Override + public void process(Iterable<Target> partialResult) + throws QueryException, InterruptedException { + Preconditions.checkNotNull(pending, "Reuse of the callback is not allowed"); + pending.addAll(uniquifier.unique(partialResult)); + if (pending.size() >= batchThreshold) { + callback.process(pending); + pending = new ArrayList<>(); + } + } + + private void processLastPending() throws QueryException, InterruptedException { + if (!pending.isEmpty()) { + callback.process(pending); + pending = null; + } + } + } } |