aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/Chunker.java231
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java239
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java90
3 files changed, 337 insertions, 223 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
new file mode 100644
index 0000000000..afccdc68f4
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
@@ -0,0 +1,231 @@
+// Copyright 2016 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.google.devtools.build.lib.remote;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterators;
+import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.ActionInputFileCache;
+import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.build.lib.vfs.Path;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+/** An iterator-type object that transforms byte sources into a stream of BlobChunk messages. */
+public final class Chunker {
+ /** An Item is an opaque digestable source of bytes. */
+ interface Item {
+ ContentDigest getDigest() throws IOException;
+
+ InputStream getInputStream() throws IOException;
+ }
+
+ private final Iterator<Item> inputIterator;
+ private InputStream currentStream;
+ private final Set<ContentDigest> digests;
+ private ContentDigest digest;
+ private long bytesLeft;
+ private final int chunkSize;
+
+ Chunker(
+ Iterator<Item> inputIterator,
+ int chunkSize,
+ // If present, specifies which digests to output out of the whole input.
+ @Nullable Set<ContentDigest> digests)
+ throws IOException {
+ Preconditions.checkArgument(chunkSize > 0, "Chunk size must be greater than 0");
+ this.digests = digests;
+ this.inputIterator = inputIterator;
+ this.chunkSize = chunkSize;
+ advanceInput();
+ }
+
+ Chunker(Iterator<Item> inputIterator, int chunkSize) throws IOException {
+ this(inputIterator, chunkSize, null);
+ }
+
+ Chunker(Item input, int chunkSize) throws IOException {
+ this(Iterators.singletonIterator(input), chunkSize, ImmutableSet.of(input.getDigest()));
+ }
+
+ private void advanceInput() throws IOException {
+ do {
+ if (inputIterator != null && inputIterator.hasNext()) {
+ Item input = inputIterator.next();
+ digest = input.getDigest();
+ currentStream = input.getInputStream();
+ bytesLeft = digest.getSizeBytes();
+ } else {
+ digest = null;
+ currentStream = null;
+ bytesLeft = 0;
+ }
+ } while (digest != null && digests != null && !digests.contains(digest));
+ }
+
+ /** True if the object has more BlobChunk elements. */
+ public boolean hasNext() {
+ return currentStream != null;
+ }
+
+ /** Consume the next BlobChunk element. */
+ public BlobChunk next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ BlobChunk.Builder chunk = BlobChunk.newBuilder();
+ long offset = digest.getSizeBytes() - bytesLeft;
+ if (offset == 0) {
+ chunk.setDigest(digest);
+ } else {
+ chunk.setOffset(offset);
+ }
+ if (bytesLeft > 0) {
+ byte[] blob = new byte[(int) Math.min(bytesLeft, (long) chunkSize)];
+ currentStream.read(blob);
+ chunk.setData(ByteString.copyFrom(blob));
+ bytesLeft -= blob.length;
+ }
+ if (bytesLeft == 0) {
+ currentStream.close();
+ advanceInput(); // Sets the current stream to null, if it was the last.
+ }
+ return chunk.build();
+ }
+
+ static Item toItem(final byte[] blob) {
+ return new Item() {
+ @Override
+ public ContentDigest getDigest() throws IOException {
+ return ContentDigests.computeDigest(blob);
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return new ByteArrayInputStream(blob);
+ }
+ };
+ }
+
+ static Item toItem(final Path file) {
+ return new Item() {
+ @Override
+ public ContentDigest getDigest() throws IOException {
+ return ContentDigests.computeDigest(file);
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return file.getInputStream();
+ }
+ };
+ }
+
+ static Item toItem(
+ final ActionInput input, final ActionInputFileCache inputCache, final Path execRoot) {
+ return new Item() {
+ @Override
+ public ContentDigest getDigest() throws IOException {
+ return ContentDigests.getDigestFromInputCache(input, inputCache);
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return execRoot.getRelative(input.getExecPathString()).getInputStream();
+ }
+ };
+ }
+
+ /**
+ * Create a Chunker from a given ActionInput, taking its digest from the provided
+ * ActionInputFileCache.
+ */
+ public static Chunker from(
+ ActionInput input, int chunkSize, ActionInputFileCache inputCache, Path execRoot)
+ throws IOException {
+ return new Chunker(toItem(input, inputCache, execRoot), chunkSize);
+ }
+
+ /** Create a Chunker from a given blob and chunkSize. */
+ public static Chunker from(byte[] blob, int chunkSize) throws IOException {
+ return new Chunker(toItem(blob), chunkSize);
+ }
+
+ /** Create a Chunker from a given Path and chunkSize. */
+ public static Chunker from(Path file, int chunkSize) throws IOException {
+ return new Chunker(toItem(file), chunkSize);
+ }
+
+ /**
+ * Create a Chunker from multiple input sources. The order of the sources provided to the Builder
+ * will be the same order they will be chunked by.
+ */
+ public static final class Builder {
+ private final ArrayList<Item> items = new ArrayList<>();
+ private Set<ContentDigest> digests = null;
+ private int chunkSize = 0;
+
+ public Chunker build() throws IOException {
+ return new Chunker(items.iterator(), chunkSize, digests);
+ }
+
+ public Builder chunkSize(int chunkSize) {
+ this.chunkSize = chunkSize;
+ return this;
+ }
+
+ /**
+ * Restricts the Chunker to use only inputs with these digests. This is an optimization for CAS
+ * uploads where a list of digests missing from the CAS is known.
+ */
+ public Builder onlyUseDigests(Set<ContentDigest> digests) {
+ this.digests = digests;
+ return this;
+ }
+
+ public Builder addInput(byte[] blob) {
+ items.add(toItem(blob));
+ return this;
+ }
+
+ public Builder addInput(Path file) {
+ items.add(toItem(file));
+ return this;
+ }
+
+ public Builder addInput(ActionInput input, ActionInputFileCache inputCache, Path execRoot) {
+ items.add(toItem(input, inputCache, execRoot));
+ return this;
+ }
+
+ public Builder addAllInputs(
+ Collection<? extends ActionInput> inputs, ActionInputFileCache inputCache, Path execRoot) {
+ for (ActionInput input : inputs) {
+ items.add(toItem(input, inputCache, execRoot));
+ }
+ return this;
+ }
+ }
+}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
index 2d493e12ea..56d4fa2e18 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
@@ -17,7 +17,6 @@ package com.google.devtools.build.lib.remote;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterators;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException;
@@ -54,7 +53,6 @@ import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
@@ -62,7 +60,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -75,221 +72,10 @@ public class GrpcActionCache implements RemoteActionCache {
/** Channel over which to send gRPC CAS queries. */
private final GrpcCasInterface casIface;
+
private final GrpcExecutionCacheInterface iface;
private final RemoteOptions options;
- /** Reads from multiple sequential inputs and chunks the data into BlobChunks. */
- static interface BlobChunkIterator {
- boolean hasNext();
-
- BlobChunk next() throws IOException; // IOException can be a result of file read.
- }
-
- final class BlobChunkInlineIterator implements BlobChunkIterator {
- private final Iterator<byte[]> blobIterator;
- private final Set<ContentDigest> digests;
- private int offset;
- private ContentDigest digest;
- private byte[] currentBlob;
-
- public BlobChunkInlineIterator(Set<ContentDigest> digests, Iterator<byte[]> blobIterator) {
- this.digests = digests;
- this.blobIterator = blobIterator;
- advanceInput();
- }
-
- public BlobChunkInlineIterator(byte[] blob) {
- blobIterator = null;
- offset = 0;
- currentBlob = blob;
- digest = ContentDigests.computeDigest(currentBlob);
- digests = null;
- }
-
- private void advanceInput() {
- offset = 0;
- do {
- if (blobIterator != null && blobIterator.hasNext()) {
- currentBlob = blobIterator.next();
- digest = ContentDigests.computeDigest(currentBlob);
- } else {
- currentBlob = null;
- digest = null;
- }
- } while (digest != null && !digests.contains(digest));
- }
-
- @Override
- public boolean hasNext() {
- return currentBlob != null;
- }
-
- @Override
- public BlobChunk next() throws IOException {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- BlobChunk.Builder chunk = BlobChunk.newBuilder();
- if (offset == 0) {
- chunk.setDigest(digest);
- } else {
- chunk.setOffset(offset);
- }
- int size = Math.min(currentBlob.length - offset, options.grpcMaxChunkSizeBytes);
- if (size > 0) {
- chunk.setData(ByteString.copyFrom(currentBlob, offset, size));
- offset += size;
- }
- if (offset >= currentBlob.length) {
- advanceInput();
- }
- return chunk.build();
- }
- }
-
- final class BlobChunkFileIterator implements BlobChunkIterator {
- private final Iterator<Path> fileIterator;
- private InputStream currentStream;
- private final Set<ContentDigest> digests;
- private ContentDigest digest;
- private long bytesLeft;
-
- public BlobChunkFileIterator(Set<ContentDigest> digests, Iterator<Path> fileIterator)
- throws IOException {
- this.digests = digests;
- this.fileIterator = fileIterator;
- advanceInput();
- }
-
- public BlobChunkFileIterator(Path file) throws IOException {
- fileIterator = Iterators.singletonIterator(file);
- digests = ImmutableSet.of(ContentDigests.computeDigest(file));
- advanceInput();
- }
-
- private void advanceInput() throws IOException {
- do {
- if (fileIterator != null && fileIterator.hasNext()) {
- Path file = fileIterator.next();
- digest = ContentDigests.computeDigest(file);
- currentStream = file.getInputStream();
- bytesLeft = digest.getSizeBytes();
- } else {
- digest = null;
- currentStream = null;
- bytesLeft = 0;
- }
- } while (digest != null && !digests.contains(digest));
- }
-
- @Override
- public boolean hasNext() {
- return currentStream != null;
- }
-
- @Override
- public BlobChunk next() throws IOException {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- BlobChunk.Builder chunk = BlobChunk.newBuilder();
- long offset = digest.getSizeBytes() - bytesLeft;
- if (offset == 0) {
- chunk.setDigest(digest);
- } else {
- chunk.setOffset(offset);
- }
- if (bytesLeft > 0) {
- byte[] blob = new byte[(int) Math.min(bytesLeft, options.grpcMaxChunkSizeBytes)];
- currentStream.read(blob);
- chunk.setData(ByteString.copyFrom(blob));
- bytesLeft -= blob.length;
- }
- if (bytesLeft == 0) {
- currentStream.close();
- advanceInput();
- }
- return chunk.build();
- }
- }
-
- final class BlobChunkActionInputIterator implements BlobChunkIterator {
- private final Iterator<? extends ActionInput> inputIterator;
- private final ActionInputFileCache inputCache;
- private final Path execRoot;
- private InputStream currentStream;
- private final Set<ContentDigest> digests;
- private ContentDigest digest;
- private long bytesLeft;
-
- public BlobChunkActionInputIterator(
- Set<ContentDigest> digests,
- Path execRoot,
- Iterator<? extends ActionInput> inputIterator,
- ActionInputFileCache inputCache)
- throws IOException {
- this.digests = digests;
- this.inputIterator = inputIterator;
- this.inputCache = inputCache;
- this.execRoot = execRoot;
- advanceInput();
- }
-
- public BlobChunkActionInputIterator(
- ActionInput input, Path execRoot, ActionInputFileCache inputCache) throws IOException {
- inputIterator = Iterators.singletonIterator(input);
- digests = ImmutableSet.of(ContentDigests.getDigestFromInputCache(input, inputCache));
- this.inputCache = inputCache;
- this.execRoot = execRoot;
- advanceInput();
- }
-
- private void advanceInput() throws IOException {
- do {
- if (inputIterator != null && inputIterator.hasNext()) {
- ActionInput input = inputIterator.next();
- digest = ContentDigests.getDigestFromInputCache(input, inputCache);
- currentStream = execRoot.getRelative(input.getExecPathString()).getInputStream();
- bytesLeft = digest.getSizeBytes();
- } else {
- digest = null;
- currentStream = null;
- bytesLeft = 0;
- }
- } while (digest != null && !digests.contains(digest));
- }
-
- @Override
- public boolean hasNext() {
- return currentStream != null;
- }
-
- @Override
- public BlobChunk next() throws IOException {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- BlobChunk.Builder chunk = BlobChunk.newBuilder();
- long offset = digest.getSizeBytes() - bytesLeft;
- if (offset == 0) {
- chunk.setDigest(digest);
- } else {
- chunk.setOffset(offset);
- }
- if (bytesLeft > 0) {
- byte[] blob = new byte[(int) Math.min(bytesLeft, (long) options.grpcMaxChunkSizeBytes)];
- currentStream.read(blob);
- chunk.setData(ByteString.copyFrom(blob));
- bytesLeft -= blob.length;
- }
- if (bytesLeft == 0) {
- currentStream.close();
- advanceInput();
- }
- return chunk.build();
- }
- }
-
public GrpcActionCache(
RemoteOptions options, GrpcCasInterface casIface, GrpcExecutionCacheInterface iface) {
this.options = options;
@@ -352,8 +138,11 @@ public class GrpcActionCache implements RemoteActionCache {
if (!actionInputs.isEmpty()) {
uploadChunks(
actionInputs.size(),
- new BlobChunkActionInputIterator(
- missingDigests, execRoot, actionInputs.iterator(), repository.getInputFileCache()));
+ new Chunker.Builder()
+ .chunkSize(options.grpcMaxChunkSizeBytes)
+ .addAllInputs(actionInputs, repository.getInputFileCache(), execRoot)
+ .onlyUseDigests(missingDigests)
+ .build());
}
}
@@ -460,12 +249,14 @@ public class GrpcActionCache implements RemoteActionCache {
public void uploadAllResults(Path execRoot, Collection<Path> files, ActionResult.Builder result)
throws IOException, InterruptedException {
ArrayList<ContentDigest> digests = new ArrayList<>();
+ Chunker.Builder b = new Chunker.Builder().chunkSize(options.grpcMaxChunkSizeBytes);
for (Path file : files) {
digests.add(ContentDigests.computeDigest(file));
+ b.addInput(file);
}
ImmutableSet<ContentDigest> missing = getMissingDigests(digests);
if (!missing.isEmpty()) {
- uploadChunks(missing.size(), new BlobChunkFileIterator(missing, files.iterator()));
+ uploadChunks(missing.size(), b.onlyUseDigests(missing).build());
}
int index = 0;
for (Path file : files) {
@@ -495,7 +286,7 @@ public class GrpcActionCache implements RemoteActionCache {
ContentDigest digest = ContentDigests.computeDigest(file);
ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
- uploadChunks(1, new BlobChunkFileIterator(file));
+ uploadChunks(1, Chunker.from(file, options.grpcMaxChunkSizeBytes));
}
return digest;
}
@@ -513,7 +304,7 @@ public class GrpcActionCache implements RemoteActionCache {
ContentDigest digest = ContentDigests.getDigestFromInputCache(input, inputCache);
ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
- uploadChunks(1, new BlobChunkActionInputIterator(input, execRoot, inputCache));
+ uploadChunks(1, Chunker.from(input, options.grpcMaxChunkSizeBytes, inputCache, execRoot));
}
return digest;
}
@@ -549,7 +340,7 @@ public class GrpcActionCache implements RemoteActionCache {
}
}
- private void uploadChunks(int numItems, BlobChunkIterator blobs)
+ private void uploadChunks(int numItems, Chunker blobs)
throws InterruptedException, IOException {
CountDownLatch finishLatch = new CountDownLatch(numItems); // Maximal number of batches.
AtomicReference<RuntimeException> exception = new AtomicReference<>(null);
@@ -610,13 +401,15 @@ public class GrpcActionCache implements RemoteActionCache {
public ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs)
throws InterruptedException {
ArrayList<ContentDigest> digests = new ArrayList<>();
+ Chunker.Builder b = new Chunker.Builder().chunkSize(options.grpcMaxChunkSizeBytes);
for (byte[] blob : blobs) {
digests.add(ContentDigests.computeDigest(blob));
+ b.addInput(blob);
}
ImmutableSet<ContentDigest> missing = getMissingDigests(digests);
try {
if (!missing.isEmpty()) {
- uploadChunks(missing.size(), new BlobChunkInlineIterator(missing, blobs.iterator()));
+ uploadChunks(missing.size(), b.onlyUseDigests(missing).build());
}
return ImmutableList.copyOf(digests);
} catch (IOException e) {
@@ -631,7 +424,7 @@ public class GrpcActionCache implements RemoteActionCache {
ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest));
try {
if (!missing.isEmpty()) {
- uploadChunks(1, new BlobChunkInlineIterator(blob));
+ uploadChunks(1, Chunker.from(blob, options.grpcMaxChunkSizeBytes));
}
return digest;
} catch (IOException e) {
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java
new file mode 100644
index 0000000000..13be580204
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java
@@ -0,0 +1,90 @@
+// Copyright 2015 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import static com.google.common.truth.Truth.assertThat;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
+import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+import com.google.protobuf.ByteString;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link Chunker}. */
+@RunWith(JUnit4.class)
+public class ChunkerTest {
+
+ static BlobChunk buildChunk(long offset, String data) {
+ return BlobChunk.newBuilder().setOffset(offset).setData(ByteString.copyFromUtf8(data)).build();
+ }
+
+ static BlobChunk buildChunk(ContentDigest digest, String data) {
+ return BlobChunk.newBuilder().setDigest(digest).setData(ByteString.copyFromUtf8(data)).build();
+ }
+
+ @Test
+ public void testChunker() throws Exception {
+ byte[] b1 = "abcdefg".getBytes(UTF_8);
+ byte[] b2 = "hij".getBytes(UTF_8);
+ byte[] b3 = "klmnopqrstuvwxyz".getBytes(UTF_8);
+ ContentDigest d1 = ContentDigests.computeDigest(b1);
+ ContentDigest d2 = ContentDigests.computeDigest(b2);
+ ContentDigest d3 = ContentDigests.computeDigest(b3);
+ Chunker c = new Chunker.Builder().chunkSize(5).addInput(b1).addInput(b2).addInput(b3).build();
+ assertThat(c.hasNext()).isTrue();
+ assertThat(c.next()).isEqualTo(buildChunk(d1, "abcde"));
+ assertThat(c.hasNext()).isTrue();
+ assertThat(c.next()).isEqualTo(buildChunk(5, "fg"));
+ assertThat(c.hasNext()).isTrue();
+ assertThat(c.next()).isEqualTo(buildChunk(d2, "hij"));
+ assertThat(c.hasNext()).isTrue();
+ assertThat(c.next()).isEqualTo(buildChunk(d3, "klmno"));
+ assertThat(c.hasNext()).isTrue();
+ assertThat(c.next()).isEqualTo(buildChunk(5, "pqrst"));
+ assertThat(c.hasNext()).isTrue();
+ assertThat(c.next()).isEqualTo(buildChunk(10, "uvwxy"));
+ assertThat(c.hasNext()).isTrue();
+ assertThat(c.next()).isEqualTo(buildChunk(15, "z"));
+ assertThat(c.hasNext()).isFalse();
+ }
+
+ @Test
+ public void testIgnoresUnmentionedDigests() throws Exception {
+ byte[] b1 = "a".getBytes(UTF_8);
+ byte[] b2 = "bb".getBytes(UTF_8);
+ byte[] b3 = "ccc".getBytes(UTF_8);
+ byte[] b4 = "dddd".getBytes(UTF_8);
+ ContentDigest d1 = ContentDigests.computeDigest(b1);
+ ContentDigest d3 = ContentDigests.computeDigest(b3);
+ Chunker c =
+ new Chunker.Builder()
+ .chunkSize(2)
+ .onlyUseDigests(ImmutableSet.of(d1, d3))
+ .addInput(b1)
+ .addInput(b2)
+ .addInput(b3)
+ .addInput(b4)
+ .build();
+ assertThat(c.hasNext()).isTrue();
+ assertThat(c.next()).isEqualTo(buildChunk(d1, "a"));
+ assertThat(c.hasNext()).isTrue();
+ assertThat(c.next()).isEqualTo(buildChunk(d3, "cc"));
+ assertThat(c.hasNext()).isTrue();
+ assertThat(c.next()).isEqualTo(buildChunk(2, "c"));
+ assertThat(c.hasNext()).isFalse();
+ }
+}