aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar buchgr <buchgr@google.com>2017-07-12 10:29:27 +0200
committerGravatar László Csomor <laszlocsomor@google.com>2017-07-12 10:52:22 +0200
commit226510b85b9c0b1465aa860bc6cdd792b01b3412 (patch)
tree7d512eb26f18dfa918c20ccf931be8c654107134
parent7a1e3a534efccc86dd86ba9d253f0a6801df7158 (diff)
remote: Rewrite Chunker
- Remove the Chunker.Builder and use the Chunker constructors instead. - Fix a correctness bug, where the chunker would ignore the return value of InputStream.read(byte[]). - Have Chunk.getData() return a ByteString as opposed to a byte[]. All callsides need ByteString objects and this change makes the subsequent change possible. - Have the Chunker use a preallocated byte[] in order to avoid allocating a new one on every call to next(). RELNOTES: None. PiperOrigin-RevId: 161637158
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java32
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/Chunker.java375
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java20
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java49
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java151
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java12
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java5
7 files changed, 283 insertions, 361 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
index 19b8929efb..599a282f54 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java
@@ -33,7 +33,6 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.protobuf.ByteString;
import io.grpc.CallCredentials;
import io.grpc.CallOptions;
import io.grpc.Channel;
@@ -108,7 +107,7 @@ final class ByteStreamUploader {
}
/**
- * Uploads a BLOB, as provided by the {@link Chunker.SingleSourceBuilder}, to the remote {@code
+ * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code
* ByteStream} service. The call blocks until the upload is complete, or throws an {@link
* Exception} in case of error.
*
@@ -121,9 +120,9 @@ final class ByteStreamUploader {
* @throws IOException when reading of the {@link Chunker}s input source fails
* @throws RetryException when the upload failed after a retry
*/
- public void uploadBlob(Chunker.SingleSourceBuilder chunkerBuilder)
+ public void uploadBlob(Chunker chunker)
throws IOException, InterruptedException {
- uploadBlobs(singletonList(chunkerBuilder));
+ uploadBlobs(singletonList(chunker));
}
/**
@@ -141,12 +140,12 @@ final class ByteStreamUploader {
* @throws IOException when reading of the {@link Chunker}s input source fails
* @throws RetryException when the upload failed after a retry
*/
- public void uploadBlobs(Iterable<Chunker.SingleSourceBuilder> chunkerBuilders)
+ public void uploadBlobs(Iterable<Chunker> chunkers)
throws IOException, InterruptedException {
List<ListenableFuture<Void>> uploads = new ArrayList<>();
- for (Chunker.SingleSourceBuilder chunkerBuilder : chunkerBuilders) {
- uploads.add(uploadBlobAsync(chunkerBuilder));
+ for (Chunker chunker : chunkers) {
+ uploads.add(uploadBlobAsync(chunker));
}
try {
@@ -189,9 +188,9 @@ final class ByteStreamUploader {
}
@VisibleForTesting
- ListenableFuture<Void> uploadBlobAsync(Chunker.SingleSourceBuilder chunkerBuilder)
+ ListenableFuture<Void> uploadBlobAsync(Chunker chunker)
throws IOException {
- Digest digest = checkNotNull(chunkerBuilder.getDigest());
+ Digest digest = checkNotNull(chunker.digest());
synchronized (lock) {
checkState(!isShutdown, "Must not call uploadBlobs after shutdown.");
@@ -207,7 +206,7 @@ final class ByteStreamUploader {
},
MoreExecutors.directExecutor());
startAsyncUploadWithRetry(
- chunkerBuilder, retrier.newBackoff(), (SettableFuture<Void>) uploadResult);
+ chunker, retrier.newBackoff(), (SettableFuture<Void>) uploadResult);
uploadsInProgress.put(digest, uploadResult);
}
return uploadResult;
@@ -222,7 +221,7 @@ final class ByteStreamUploader {
}
private void startAsyncUploadWithRetry(
- Chunker.SingleSourceBuilder chunkerBuilder,
+ Chunker chunker,
Retrier.Backoff backoffTimes,
SettableFuture<Void> overallUploadResult) {
@@ -242,13 +241,13 @@ final class ByteStreamUploader {
RetryException error = new RetryException(cause, backoffTimes.getRetryAttempts());
overallUploadResult.setException(error);
} else {
- retryAsyncUpload(nextDelayMillis, chunkerBuilder, backoffTimes, overallUploadResult);
+ retryAsyncUpload(nextDelayMillis, chunker, backoffTimes, overallUploadResult);
}
}
private void retryAsyncUpload(
long nextDelayMillis,
- Chunker.SingleSourceBuilder chunkerBuilder,
+ Chunker chunker,
Retrier.Backoff backoffTimes,
SettableFuture<Void> overallUploadResult) {
try {
@@ -256,7 +255,7 @@ final class ByteStreamUploader {
retryService.schedule(
() ->
startAsyncUploadWithRetry(
- chunkerBuilder, backoffTimes, overallUploadResult),
+ chunker, backoffTimes, overallUploadResult),
nextDelayMillis,
MILLISECONDS);
// In case the scheduled execution errors, we need to notify the overallUploadResult.
@@ -278,9 +277,8 @@ final class ByteStreamUploader {
}
};
- Chunker chunker;
try {
- chunker = chunkerBuilder.build();
+ chunker.reset();
} catch (IOException e) {
overallUploadResult.setException(e);
return;
@@ -384,7 +382,7 @@ final class ByteStreamUploader {
boolean isLastChunk = !chunker.hasNext();
WriteRequest request =
requestBuilder
- .setData(ByteString.copyFrom(chunk.getData()))
+ .setData(chunk.getData())
.setWriteOffset(chunk.getOffset())
.setFinishWrite(isLastChunk)
.build();
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
index d880f3f7c1..ccc07eaf05 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
@@ -14,42 +14,41 @@
package com.google.devtools.build.lib.remote;
-import static com.google.devtools.build.lib.util.Preconditions.checkArgument;
-import static com.google.devtools.build.lib.util.Preconditions.checkState;
+import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterators;
+import com.google.common.base.Throwables;
+import com.google.common.io.ByteStreams;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
-import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
-import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
-import java.util.Set;
+import java.util.function.Supplier;
-/** An iterator-type object that transforms byte sources into a stream of Chunks. */
+/**
+ * Splits an {@link InputStream} into one or more {@link Chunk}s of at most {@code chunkSize} bytes.
+ */
public final class Chunker {
- // This is effectively final, should be changed only in unit-tests!
+
+ private static final Chunk EMPTY_CHUNK =
+ new Chunk(Digests.computeDigest(new byte[0]), ByteString.EMPTY, 0);
+
private static int defaultChunkSize = 1024 * 16;
- private static final byte[] EMPTY_BLOB = new byte[0];
+ /** This method must only be called in tests! */
@VisibleForTesting
static void setDefaultChunkSizeForTesting(int value) {
defaultChunkSize = value;
}
- public static int getDefaultChunkSize() {
+ static int getDefaultChunkSize() {
return defaultChunkSize;
}
@@ -58,16 +57,26 @@ public final class Chunker {
private final Digest digest;
private final long offset;
- // TODO(olaola): consider saving data in a different format that byte[].
- private final byte[] data;
+ private final ByteString data;
- @VisibleForTesting
- Chunk(Digest digest, byte[] data, long offset) {
+ private Chunk(Digest digest, ByteString data, long offset) {
this.digest = digest;
this.data = data;
this.offset = offset;
}
+ public Digest getDigest() {
+ return digest;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public ByteString getData() {
+ return data;
+ }
+
@Override
public boolean equals(Object o) {
if (o == this) {
@@ -79,280 +88,154 @@ public final class Chunker {
Chunk other = (Chunk) o;
return other.offset == offset
&& other.digest.equals(digest)
- && Arrays.equals(other.data, data);
+ && other.data.equals(data);
}
@Override
public int hashCode() {
- return Objects.hash(digest, offset, Arrays.hashCode(data));
- }
-
- public Digest getDigest() {
- return digest;
- }
-
- public long getOffset() {
- return offset;
- }
-
- // This returns a mutable copy, for efficiency.
- public byte[] getData() {
- return data;
+ return Objects.hash(digest, offset, data);
}
}
- /** An Item is an opaque digestable source of bytes. */
- interface Item {
- Digest getDigest() throws IOException;
-
- InputStream getInputStream() throws IOException;
- }
-
- private final Iterator<Item> inputIterator;
- private InputStream currentStream;
- private Digest digest;
- private long bytesLeft;
+ private final Supplier<InputStream> dataSupplier;
+ private final Digest digest;
private final int chunkSize;
- Chunker(Iterator<Item> inputIterator, int chunkSize) throws IOException {
- Preconditions.checkArgument(chunkSize > 0, "Chunk size must be greater than 0");
- this.inputIterator = inputIterator;
- this.chunkSize = chunkSize;
- advanceInput();
- }
-
- Chunker(Item input, int chunkSize) throws IOException {
- this(Iterators.singletonIterator(input), chunkSize);
- }
-
- public void advanceInput() throws IOException {
- if (inputIterator.hasNext()) {
- Item input = inputIterator.next();
- digest = input.getDigest();
- currentStream = input.getInputStream();
- bytesLeft = digest.getSizeBytes();
- } else {
- digest = null;
- currentStream = null;
- bytesLeft = 0;
- }
- }
+ private InputStream data;
+ private long offset;
+ private byte[] chunkCache;
- /** True if the object has more {@link Chunk} elements. */
- public boolean hasNext() {
- return currentStream != null;
+ public Chunker(byte[] data) throws IOException {
+ this(data, getDefaultChunkSize());
}
- /** Consume the next Chunk element. */
- public Chunk next() throws IOException {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- long offset = digest.getSizeBytes() - bytesLeft;
- byte[] blob = EMPTY_BLOB;
- if (bytesLeft > 0) {
- blob = new byte[(int) Math.min(bytesLeft, chunkSize)];
- currentStream.read(blob);
- bytesLeft -= blob.length;
- }
- Chunk result = new Chunk(digest, blob, offset);
- if (bytesLeft == 0) {
- currentStream.close();
- advanceInput(); // Sets the current stream to null, if it was the last.
- }
- return result;
+ public Chunker(byte[] data, int chunkSize) throws IOException {
+ this(() -> new ByteArrayInputStream(data), Digests.computeDigest(data), chunkSize);
}
- private static Item toItem(final byte[] blob) {
- return new Item() {
- Digest digest = null;
-
- @Override
- public Digest getDigest() throws IOException {
- if (digest == null) {
- digest = Digests.computeDigest(blob);
- }
- return digest;
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
- return new ByteArrayInputStream(blob);
- }
- };
+ public Chunker(Path file) throws IOException {
+ this(file, getDefaultChunkSize());
}
- private static Item toItem(final Path file) {
- return new Item() {
- Digest digest = null;
-
- @Override
- public Digest getDigest() throws IOException {
- if (digest == null) {
- digest = Digests.computeDigest(file);
- }
- return digest;
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
+ public Chunker(Path file, int chunkSize) throws IOException {
+ this(() -> {
+ try {
return file.getInputStream();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- };
- }
-
- private static Item toItem(
- final ActionInput input, final ActionInputFileCache inputCache, final Path execRoot) {
- if (input instanceof VirtualActionInput) {
- return toItem((VirtualActionInput) input);
- }
- return new Item() {
- @Override
- public Digest getDigest() throws IOException {
- return Digests.getDigestFromInputCache(input, inputCache);
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
- return execRoot.getRelative(input.getExecPathString()).getInputStream();
- }
- };
+ }, Digests.computeDigest(file), chunkSize);
}
- private static Item toItem(final VirtualActionInput input) {
- return new Item() {
- Digest digest = null;
-
- @Override
- public Digest getDigest() throws IOException {
- if (digest == null) {
- digest = Digests.computeDigest(input);
- }
- return digest;
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
- ByteArrayOutputStream buffer = new ByteArrayOutputStream();
- input.writeTo(buffer);
- return new ByteArrayInputStream(buffer.toByteArray());
- }
- };
+ public Chunker(ActionInput actionInput, ActionInputFileCache inputCache, Path execRoot) throws
+ IOException{
+ this(actionInput, inputCache, execRoot, getDefaultChunkSize());
}
- private static class MemberOf implements Predicate<Item> {
- private final Set<Digest> digests;
-
- public MemberOf(Set<Digest> digests) {
- this.digests = digests;
- }
-
- @Override
- public boolean apply(Item item) {
+ public Chunker(ActionInput actionInput, ActionInputFileCache inputCache, Path execRoot,
+ int chunkSize)
+ throws IOException {
+ this(() -> {
try {
- return digests.contains(item.getDigest());
+ return execRoot.getRelative(actionInput.getExecPathString()).getInputStream();
} catch (IOException e) {
throw new RuntimeException(e);
}
- }
+ }, Digests.getDigestFromInputCache(actionInput, inputCache), chunkSize);
}
- /**
- * Creates a Chunker from a single input source.
- *
- * <p>As we phase out usages of multiple input sources, this will soon completely replace the
- * multiple inputs Builder.
- */
- public static final class SingleSourceBuilder {
- private Item item;
- private int chunkSize = getDefaultChunkSize();
-
- public SingleSourceBuilder chunkSize(int chunkSize) {
- checkArgument(chunkSize > 0, "chunkSize must be gt 0.");
- this.chunkSize = chunkSize;
- return this;
- }
-
- public SingleSourceBuilder input(byte[] blob) {
- item = toItem(blob);
- return this;
- }
+ private Chunker(Supplier<InputStream> dataSupplier, Digest digest, int chunkSize)
+ throws IOException {
+ this.dataSupplier = checkNotNull(dataSupplier);
+ this.digest = checkNotNull(digest);
+ this.chunkSize = chunkSize;
+ reset();
+ }
- public SingleSourceBuilder input(Path file) {
- item = toItem(file);
- return this;
- }
+ public Digest digest() {
+ return digest;
+ }
- public SingleSourceBuilder input(ActionInput input, ActionInputFileCache inputCache,
- Path execRoot) {
- item = toItem(input, inputCache, execRoot);
- return this;
+ /**
+ * Reset the {@link Chunker} state to when it was newly constructed.
+ */
+ public void reset() throws IOException {
+ if (data != null) {
+ data.close();
}
-
- public Digest getDigest() throws IOException {
- checkState(item != null, "Need to specify an input source first.");
- return item.getDigest();
+ try {
+ data = dataSupplier.get();
+ } catch (RuntimeException e) {
+ Throwables.propagateIfPossible(e.getCause(), IOException.class);
+ throw e;
}
+ offset = 0;
+ chunkCache = null;
+ }
- public Chunker build() throws IOException {
- checkState(item != null, "No input source provided.");
- return new Chunker(item, chunkSize);
- }
+ /**
+ * Returns {@code true} if a subsequent call to {@link #next()} returns a {@link Chunk} object;
+ */
+ public boolean hasNext() {
+ return data != null;
}
/**
- * Create a Chunker from multiple input sources. The order of the sources provided to the Builder
- * will be the same order they will be chunked by.
+ * Returns the next {@link Chunk} or throws a {@link NoSuchElementException} if no data is left.
+ *
+ * <p>Always call {@link #hasNext()} before calling this method.
+ *
+ * <p>Zero byte inputs are treated special. Instead of throwing a {@link NoSuchElementException}
+ * on the first call to {@link #next()}, a {@link Chunk} with an empty {@link ByteString} is
+ * returned.
*/
- @Deprecated
- public static final class Builder {
- private final ImmutableList.Builder<Item> items = ImmutableList.builder();
- private Set<Digest> digests = null;
- private int chunkSize = getDefaultChunkSize();
-
- public Chunker build() throws IOException {
- return new Chunker(
- digests == null
- ? items.build().iterator()
- : Iterators.filter(items.build().iterator(), new MemberOf(digests)),
- chunkSize);
+ public Chunk next() throws IOException {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
}
- public Builder chunkSize(int chunkSize) {
- this.chunkSize = chunkSize;
- return this;
+ if (digest.getSizeBytes() == 0) {
+ data = null;
+ return EMPTY_CHUNK;
}
- /**
- * Restricts the Chunker to use only inputs with these digests. This is an optimization for CAS
- * uploads where a list of digests missing from the CAS is known.
- */
- public Builder onlyUseDigests(Set<Digest> digests) {
- this.digests = digests;
- return this;
+ // The cast to int is safe, because the return value is capped at chunkSize.
+ int bytesToRead = (int) Math.min(bytesLeft(), chunkSize);
+ if (bytesToRead == 0) {
+ chunkCache = null;
+ data = null;
+ throw new NoSuchElementException();
}
- public Builder addInput(byte[] blob) {
- items.add(toItem(blob));
- return this;
+ if (chunkCache == null) {
+ // Lazily allocate it in order to save memory on small data.
+ // 1) bytesToRead < chunkSize: There will only ever be one next() call.
+ // 2) bytesToRead == chunkSize: chunkCache will be set to its biggest possible value.
+ // 3) bytestoRead > chunkSize: Not possible, due to Math.min above.
+ chunkCache = new byte[bytesToRead];
}
- public Builder addInput(Path file) {
- items.add(toItem(file));
- return this;
+ long offsetBefore = offset;
+ try {
+ ByteStreams.readFully(data, chunkCache, 0, bytesToRead);
+ } catch (EOFException e) {
+ throw new IllegalStateException("Reached EOF, but expected "
+ + bytesToRead + " bytes.", e);
}
+ offset += bytesToRead;
- public Builder addInput(ActionInput input, ActionInputFileCache inputCache, Path execRoot) {
- items.add(toItem(input, inputCache, execRoot));
- return this;
- }
+ ByteString blob = ByteString.copyFrom(chunkCache, 0, bytesToRead);
- public Builder addAllInputs(
- Collection<? extends ActionInput> inputs, ActionInputFileCache inputCache, Path execRoot) {
- for (ActionInput input : inputs) {
- items.add(toItem(input, inputCache, execRoot));
- }
- return this;
+ if (bytesLeft() == 0) {
+ data.close();
+ data = null;
+ chunkCache = null;
}
+
+ return new Chunk(digest, blob, offsetBefore);
+ }
+
+ private long bytesLeft() {
+ return digest.getSizeBytes() - offset;
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
index 615e9f83d7..5b21e249b5 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
@@ -27,7 +27,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.Chunker.SingleSourceBuilder;
import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.util.Preconditions;
@@ -172,14 +171,13 @@ public class GrpcRemoteCache implements RemoteActionCache {
}
uploadBlob(command.toByteArray());
if (!actionInputs.isEmpty()) {
- List<Chunker.SingleSourceBuilder> inputsToUpload = new ArrayList<>();
+ List<Chunker> inputsToUpload = new ArrayList<>();
ActionInputFileCache inputFileCache = repository.getInputFileCache();
for (ActionInput actionInput : actionInputs) {
Digest digest = Digests.getDigestFromInputCache(actionInput, inputFileCache);
if (missingDigests.contains(digest)) {
- Chunker.SingleSourceBuilder builder =
- new Chunker.SingleSourceBuilder().input(actionInput, inputFileCache, execRoot);
- inputsToUpload.add(builder);
+ Chunker chunker = new Chunker(actionInput, inputFileCache, execRoot);
+ inputsToUpload.add(chunker);
}
}
@@ -323,7 +321,7 @@ public class GrpcRemoteCache implements RemoteActionCache {
throws IOException, InterruptedException {
ArrayList<Digest> digests = new ArrayList<>();
ImmutableSet<Digest> digestsToUpload = getMissingDigests(digests);
- List<Chunker.SingleSourceBuilder> filesToUpload = new ArrayList<>(digestsToUpload.size());
+ List<Chunker> filesToUpload = new ArrayList<>(digestsToUpload.size());
for (Path file : files) {
if (!file.exists()) {
// We ignore requested results that have not been generated by the action.
@@ -339,8 +337,8 @@ public class GrpcRemoteCache implements RemoteActionCache {
digests.add(digest);
if (digestsToUpload.contains(digest)) {
- Chunker.SingleSourceBuilder chunkerBuilder = new SingleSourceBuilder().input(file);
- filesToUpload.add(chunkerBuilder);
+ Chunker chunker = new Chunker(file);
+ filesToUpload.add(chunker);
}
}
@@ -379,7 +377,7 @@ public class GrpcRemoteCache implements RemoteActionCache {
Digest digest = Digests.computeDigest(file);
ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
- uploader.uploadBlob(new Chunker.SingleSourceBuilder().input(file));
+ uploader.uploadBlob(new Chunker(file));
}
return digest;
}
@@ -395,7 +393,7 @@ public class GrpcRemoteCache implements RemoteActionCache {
Digest digest = Digests.getDigestFromInputCache(input, inputCache);
ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
- uploader.uploadBlob(new Chunker.SingleSourceBuilder().input(input, inputCache, execRoot));
+ uploader.uploadBlob(new Chunker(input, inputCache, execRoot));
}
return digest;
}
@@ -404,7 +402,7 @@ public class GrpcRemoteCache implements RemoteActionCache {
Digest digest = Digests.computeDigest(blob);
ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
- uploader.uploadBlob(new Chunker.SingleSourceBuilder().input(blob));
+ uploader.uploadBlob(new Chunker(blob));
}
return digest;
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
index 596b5a65e2..8cd78ff10c 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java
@@ -23,7 +23,6 @@ import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
-import com.google.devtools.build.lib.remote.Chunker.SingleSourceBuilder;
import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.Metadata;
@@ -106,8 +105,7 @@ public class ByteStreamUploaderTest {
byte[] blob = new byte[CHUNK_SIZE * 2 + 1];
new Random().nextBytes(blob);
- Chunker.SingleSourceBuilder builder =
- new SingleSourceBuilder().chunkSize(CHUNK_SIZE).input(blob);
+ Chunker chunker = new Chunker(blob, CHUNK_SIZE);
serviceRegistry.addService(new ByteStreamImplBase() {
@Override
@@ -158,7 +156,7 @@ public class ByteStreamUploaderTest {
}
});
- uploader.uploadBlob(builder);
+ uploader.uploadBlob(chunker);
// This test should not have triggered any retries.
Mockito.verifyZeroInteractions(mockBackoff);
@@ -174,16 +172,15 @@ public class ByteStreamUploaderTest {
int numUploads = 100;
Map<String, byte[]> blobsByHash = new HashMap<>();
- List<Chunker.SingleSourceBuilder> builders = new ArrayList<>(numUploads);
+ List<Chunker> builders = new ArrayList<>(numUploads);
Random rand = new Random();
for (int i = 0; i < numUploads; i++) {
int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE;
byte[] blob = new byte[blobSize];
rand.nextBytes(blob);
- Chunker.SingleSourceBuilder builder =
- new Chunker.SingleSourceBuilder().chunkSize(CHUNK_SIZE).input(blob);
- builders.add(builder);
- blobsByHash.put(builder.getDigest().getHash(), blob);
+ Chunker chunker = new Chunker(blob, CHUNK_SIZE);
+ builders.add(chunker);
+ blobsByHash.put(chunker.digest().getHash(), blob);
}
Set<String> uploadsFailedOnce = Collections.synchronizedSet(new HashSet<>());
@@ -263,8 +260,7 @@ public class ByteStreamUploaderTest {
new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
byte[] blob = new byte[CHUNK_SIZE * 10];
- Chunker.SingleSourceBuilder builder =
- new Chunker.SingleSourceBuilder().chunkSize(CHUNK_SIZE).input(blob);
+ Chunker chunker = new Chunker(blob, CHUNK_SIZE);
AtomicInteger numWriteCalls = new AtomicInteger();
@@ -296,8 +292,8 @@ public class ByteStreamUploaderTest {
}
});
- Future<?> upload1 = uploader.uploadBlobAsync(builder);
- Future<?> upload2 = uploader.uploadBlobAsync(builder);
+ Future<?> upload1 = uploader.uploadBlobAsync(chunker);
+ Future<?> upload2 = uploader.uploadBlobAsync(chunker);
assertThat(upload1).isSameAs(upload2);
@@ -313,8 +309,7 @@ public class ByteStreamUploaderTest {
new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService);
byte[] blob = new byte[CHUNK_SIZE];
- Chunker.SingleSourceBuilder builder =
- new Chunker.SingleSourceBuilder().chunkSize(CHUNK_SIZE).input(blob);
+ Chunker chunker = new Chunker(blob, CHUNK_SIZE);
serviceRegistry.addService(new ByteStreamImplBase() {
@Override
@@ -325,7 +320,7 @@ public class ByteStreamUploaderTest {
});
try {
- uploader.uploadBlob(builder);
+ uploader.uploadBlob(chunker);
fail("Should have thrown an exception.");
} catch (RetryException e) {
assertThat(e.getAttempts()).isEqualTo(2);
@@ -364,15 +359,13 @@ public class ByteStreamUploaderTest {
serviceRegistry.addService(service);
byte[] blob1 = new byte[CHUNK_SIZE];
- Chunker.SingleSourceBuilder builder1 =
- new Chunker.SingleSourceBuilder().chunkSize(CHUNK_SIZE).input(blob1);
+ Chunker chunker1 = new Chunker(blob1, CHUNK_SIZE);
byte[] blob2 = new byte[CHUNK_SIZE + 1];
- Chunker.SingleSourceBuilder builder2 =
- new Chunker.SingleSourceBuilder().chunkSize(CHUNK_SIZE).input(blob2);
+ Chunker chunker2 = new Chunker(blob2, CHUNK_SIZE);
- ListenableFuture<Void> f1 = uploader.uploadBlobAsync(builder1);
- ListenableFuture<Void> f2 = uploader.uploadBlobAsync(builder2);
+ ListenableFuture<Void> f1 = uploader.uploadBlobAsync(chunker1);
+ ListenableFuture<Void> f2 = uploader.uploadBlobAsync(chunker2);
assertThat(uploader.uploadsInProgress()).isTrue();
@@ -407,9 +400,9 @@ public class ByteStreamUploaderTest {
assertThat(retryService.isShutdown()).isTrue();
byte[] blob = new byte[1];
- Chunker.SingleSourceBuilder builder = new Chunker.SingleSourceBuilder().input(blob);
+ Chunker chunker = new Chunker(blob, CHUNK_SIZE);
try {
- uploader.uploadBlob(builder);
+ uploader.uploadBlob(chunker);
fail("Should have thrown an exception.");
} catch (RetryException e) {
assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class);
@@ -447,9 +440,9 @@ public class ByteStreamUploaderTest {
});
byte[] blob = new byte[1];
- Chunker.SingleSourceBuilder builder = new Chunker.SingleSourceBuilder().input(blob);
+ Chunker chunker = new Chunker(blob, CHUNK_SIZE);
- uploader.uploadBlob(builder);
+ uploader.uploadBlob(chunker);
}
@Test(timeout = 10000)
@@ -471,10 +464,10 @@ public class ByteStreamUploaderTest {
});
byte[] blob = new byte[1];
- Chunker.SingleSourceBuilder builder = new Chunker.SingleSourceBuilder().input(blob);
+ Chunker chunker = new Chunker(blob, CHUNK_SIZE);
try {
- uploader.uploadBlob(builder);
+ uploader.uploadBlob(chunker);
fail("Should have thrown an exception.");
} catch (RetryException e) {
assertThat(numCalls.get()).isEqualTo(1);
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java
index b51b9d6259..110dd6dd8d 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java
@@ -14,11 +14,15 @@
package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
-import static java.nio.charset.StandardCharsets.UTF_8;
+import static junit.framework.TestCase.fail;
-import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.remote.Chunker.Chunk;
import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.util.Random;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -28,54 +32,105 @@ import org.junit.runners.JUnit4;
public class ChunkerTest {
@Test
- public void testChunker() throws Exception {
- byte[] b1 = "abcdefg".getBytes(UTF_8);
- byte[] b2 = "hij".getBytes(UTF_8);
- byte[] b3 = "klmnopqrstuvwxyz".getBytes(UTF_8);
- Digest d1 = Digests.computeDigest(b1);
- Digest d2 = Digests.computeDigest(b2);
- Digest d3 = Digests.computeDigest(b3);
- Chunker c = new Chunker.Builder().chunkSize(5).addInput(b1).addInput(b2).addInput(b3).build();
- assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(new Chunk(d1, "abcde".getBytes(UTF_8), 0));
- assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(new Chunk(d1, "fg".getBytes(UTF_8), 5));
- assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(new Chunk(d2, "hij".getBytes(UTF_8), 0));
- assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(new Chunk(d3, "klmno".getBytes(UTF_8), 0));
- assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(new Chunk(d3, "pqrst".getBytes(UTF_8), 5));
- assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(new Chunk(d3, "uvwxy".getBytes(UTF_8), 10));
- assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(new Chunk(d3, "z".getBytes(UTF_8), 15));
- assertThat(c.hasNext()).isFalse();
+ public void chunkingShouldWork() throws IOException {
+ Random rand = new Random();
+ byte[] expectedData = new byte[21];
+ rand.nextBytes(expectedData);
+ Digest expectedDigest = Digests.computeDigest(expectedData);
+
+ Chunker chunker = new Chunker(expectedData, 10);
+
+ ByteArrayOutputStream actualData = new ByteArrayOutputStream();
+
+ assertThat(chunker.hasNext()).isTrue();
+ Chunk next = chunker.next();
+ assertThat(next.getOffset()).isEqualTo(0);
+ assertThat(next.getData()).hasSize(10);
+ assertThat(next.getDigest()).isEqualTo(expectedDigest);
+ next.getData().writeTo(actualData);
+
+ assertThat(chunker.hasNext()).isTrue();
+ next = chunker.next();
+ assertThat(next.getOffset()).isEqualTo(10);
+ assertThat(next.getData()).hasSize(10);
+ assertThat(next.getDigest()).isEqualTo(expectedDigest);
+ next.getData().writeTo(actualData);
+
+ assertThat(chunker.hasNext()).isTrue();
+ next = chunker.next();
+ assertThat(next.getOffset()).isEqualTo(20);
+ assertThat(next.getData()).hasSize(1);
+ assertThat(next.getDigest()).isEqualTo(expectedDigest);
+ next.getData().writeTo(actualData);
+
+ assertThat(chunker.hasNext()).isFalse();
+
+ assertThat(expectedData).isEqualTo(actualData.toByteArray());
+ }
+
+ @Test
+ public void nextShouldThrowIfNoMoreData() throws IOException {
+ byte[] data = new byte[10];
+ Chunker chunker = new Chunker(data, 10);
+
+ assertThat(chunker.hasNext()).isTrue();
+ assertThat(chunker.next()).isNotNull();
+
+ assertThat(chunker.hasNext()).isFalse();
+
+ try {
+ chunker.next();
+ fail("Should have thrown an exception");
+ } catch (NoSuchElementException expected) {
+ // Intentionally left empty.
+ }
+ }
+
+ @Test
+ public void emptyData() throws Exception {
+ byte[] data = new byte[0];
+ Chunker chunker = new Chunker(data);
+
+ assertThat(chunker.hasNext()).isTrue();
+
+ Chunk next = chunker.next();
+
+ assertThat(next).isNotNull();
+ assertThat(next.getData()).isEmpty();
+ assertThat(next.getOffset()).isEqualTo(0);
+
+ assertThat(chunker.hasNext()).isFalse();
+
+ try {
+ chunker.next();
+ fail("Should have thrown an exception");
+ } catch (NoSuchElementException expected) {
+ // Intentionally left empty.
+ }
}
@Test
- public void testIgnoresUnmentionedDigests() throws Exception {
- byte[] b1 = "a".getBytes(UTF_8);
- byte[] b2 = "bb".getBytes(UTF_8);
- byte[] b3 = "ccc".getBytes(UTF_8);
- byte[] b4 = "dddd".getBytes(UTF_8);
- Digest d1 = Digests.computeDigest(b1);
- Digest d3 = Digests.computeDigest(b3);
- Chunker c =
- new Chunker.Builder()
- .chunkSize(2)
- .onlyUseDigests(ImmutableSet.of(d1, d3))
- .addInput(b1)
- .addInput(b2)
- .addInput(b3)
- .addInput(b4)
- .build();
- assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(new Chunk(d1, "a".getBytes(UTF_8), 0));
- assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(new Chunk(d3, "cc".getBytes(UTF_8), 0));
- assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(new Chunk(d3, "c".getBytes(UTF_8), 2));
- assertThat(c.hasNext()).isFalse();
+ public void reset() throws Exception {
+ byte[] data = new byte[]{1, 2, 3};
+ Chunker chunker = new Chunker(data, 1);
+
+ assertNextEquals(chunker, (byte) 1);
+ assertNextEquals(chunker, (byte) 2);
+
+ chunker.reset();
+
+ assertNextEquals(chunker, (byte) 1);
+ assertNextEquals(chunker, (byte) 2);
+ assertNextEquals(chunker, (byte) 3);
+
+ chunker.reset();
+
+ assertNextEquals(chunker, (byte) 1);
+ }
+
+ private void assertNextEquals(Chunker chunker, byte... data) throws IOException {
+ assertThat(chunker.hasNext()).isTrue();
+ ByteString next = chunker.next().getData();
+ assertThat(next.toByteArray()).isEqualTo(data);
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
index 9d65503b48..7b1f359b91 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java
@@ -291,11 +291,7 @@ public class GrpcRemoteCacheTest {
this.responseObserver = responseObserver;
this.contents = contents;
try {
- chunker =
- new Chunker.Builder()
- .chunkSize(chunkSizeBytes)
- .addInput(contents.getBytes(UTF_8))
- .build();
+ chunker = new Chunker(contents.getBytes(UTF_8), chunkSizeBytes);
} catch (IOException e) {
fail("An error occurred:" + e);
}
@@ -308,15 +304,15 @@ public class GrpcRemoteCacheTest {
Chunker.Chunk chunk = chunker.next();
Digest digest = chunk.getDigest();
long offset = chunk.getOffset();
- byte[] data = chunk.getData();
+ ByteString data = chunk.getData();
if (offset == 0) {
assertThat(request.getResourceName()).contains(digest.getHash());
} else {
assertThat(request.getResourceName()).isEmpty();
}
assertThat(request.getFinishWrite())
- .isEqualTo(offset + data.length == digest.getSizeBytes());
- assertThat(request.getData().toByteArray()).isEqualTo(data);
+ .isEqualTo(offset + data.size() == digest.getSizeBytes());
+ assertThat(request.getData()).isEqualTo(data);
} catch (IOException e) {
fail("An error occurred:" + e);
}
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
index 2c6a1fb343..acc6685bf2 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java
@@ -29,7 +29,6 @@ import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.remoteexecution.v1test.Digest;
-import com.google.protobuf.ByteString;
import com.google.rpc.Code;
import com.google.rpc.Status;
import io.grpc.protobuf.StatusProto;
@@ -80,10 +79,10 @@ final class ByteStreamServer extends ByteStreamImplBase {
try {
// This still relies on the blob size to be small enough to fit in memory.
// TODO(olaola): refactor to fix this if the need arises.
- Chunker c = new Chunker.Builder().addInput(cache.downloadBlob(digest)).build();
+ Chunker c = new Chunker(cache.downloadBlob(digest));
while (c.hasNext()) {
responseObserver.onNext(
- ReadResponse.newBuilder().setData(ByteString.copyFrom(c.next().getData())).build());
+ ReadResponse.newBuilder().setData(c.next().getData()).build());
}
responseObserver.onCompleted();
} catch (CacheNotFoundException e) {