diff options
Diffstat (limited to 'src')
7 files changed, 283 insertions, 361 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java index 19b8929efb..599a282f54 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java @@ -33,7 +33,6 @@ import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.remoteexecution.v1test.Digest; -import com.google.protobuf.ByteString; import io.grpc.CallCredentials; import io.grpc.CallOptions; import io.grpc.Channel; @@ -108,7 +107,7 @@ final class ByteStreamUploader { } /** - * Uploads a BLOB, as provided by the {@link Chunker.SingleSourceBuilder}, to the remote {@code + * Uploads a BLOB, as provided by the {@link Chunker}, to the remote {@code * ByteStream} service. The call blocks until the upload is complete, or throws an {@link * Exception} in case of error. * @@ -121,9 +120,9 @@ final class ByteStreamUploader { * @throws IOException when reading of the {@link Chunker}s input source fails * @throws RetryException when the upload failed after a retry */ - public void uploadBlob(Chunker.SingleSourceBuilder chunkerBuilder) + public void uploadBlob(Chunker chunker) throws IOException, InterruptedException { - uploadBlobs(singletonList(chunkerBuilder)); + uploadBlobs(singletonList(chunker)); } /** @@ -141,12 +140,12 @@ final class ByteStreamUploader { * @throws IOException when reading of the {@link Chunker}s input source fails * @throws RetryException when the upload failed after a retry */ - public void uploadBlobs(Iterable<Chunker.SingleSourceBuilder> chunkerBuilders) + public void uploadBlobs(Iterable<Chunker> chunkers) throws IOException, InterruptedException { List<ListenableFuture<Void>> uploads = new ArrayList<>(); - for (Chunker.SingleSourceBuilder chunkerBuilder : chunkerBuilders) { - uploads.add(uploadBlobAsync(chunkerBuilder)); + for (Chunker chunker : chunkers) { + uploads.add(uploadBlobAsync(chunker)); } try { @@ -189,9 +188,9 @@ final class ByteStreamUploader { } @VisibleForTesting - ListenableFuture<Void> uploadBlobAsync(Chunker.SingleSourceBuilder chunkerBuilder) + ListenableFuture<Void> uploadBlobAsync(Chunker chunker) throws IOException { - Digest digest = checkNotNull(chunkerBuilder.getDigest()); + Digest digest = checkNotNull(chunker.digest()); synchronized (lock) { checkState(!isShutdown, "Must not call uploadBlobs after shutdown."); @@ -207,7 +206,7 @@ final class ByteStreamUploader { }, MoreExecutors.directExecutor()); startAsyncUploadWithRetry( - chunkerBuilder, retrier.newBackoff(), (SettableFuture<Void>) uploadResult); + chunker, retrier.newBackoff(), (SettableFuture<Void>) uploadResult); uploadsInProgress.put(digest, uploadResult); } return uploadResult; @@ -222,7 +221,7 @@ final class ByteStreamUploader { } private void startAsyncUploadWithRetry( - Chunker.SingleSourceBuilder chunkerBuilder, + Chunker chunker, Retrier.Backoff backoffTimes, SettableFuture<Void> overallUploadResult) { @@ -242,13 +241,13 @@ final class ByteStreamUploader { RetryException error = new RetryException(cause, backoffTimes.getRetryAttempts()); overallUploadResult.setException(error); } else { - retryAsyncUpload(nextDelayMillis, chunkerBuilder, backoffTimes, overallUploadResult); + retryAsyncUpload(nextDelayMillis, chunker, backoffTimes, overallUploadResult); } } private void retryAsyncUpload( long nextDelayMillis, - Chunker.SingleSourceBuilder chunkerBuilder, + Chunker chunker, Retrier.Backoff backoffTimes, SettableFuture<Void> overallUploadResult) { try { @@ -256,7 +255,7 @@ final class ByteStreamUploader { retryService.schedule( () -> startAsyncUploadWithRetry( - chunkerBuilder, backoffTimes, overallUploadResult), + chunker, backoffTimes, overallUploadResult), nextDelayMillis, MILLISECONDS); // In case the scheduled execution errors, we need to notify the overallUploadResult. @@ -278,9 +277,8 @@ final class ByteStreamUploader { } }; - Chunker chunker; try { - chunker = chunkerBuilder.build(); + chunker.reset(); } catch (IOException e) { overallUploadResult.setException(e); return; @@ -384,7 +382,7 @@ final class ByteStreamUploader { boolean isLastChunk = !chunker.hasNext(); WriteRequest request = requestBuilder - .setData(ByteString.copyFrom(chunk.getData())) + .setData(chunk.getData()) .setWriteOffset(chunk.getOffset()) .setFinishWrite(isLastChunk) .build(); diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index d880f3f7c1..ccc07eaf05 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -14,42 +14,41 @@ package com.google.devtools.build.lib.remote; -import static com.google.devtools.build.lib.util.Preconditions.checkArgument; -import static com.google.devtools.build.lib.util.Preconditions.checkState; +import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterators; +import com.google.common.base.Throwables; +import com.google.common.io.ByteStreams; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputFileCache; -import com.google.devtools.build.lib.actions.cache.VirtualActionInput; -import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.protobuf.ByteString; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; import java.util.NoSuchElementException; import java.util.Objects; -import java.util.Set; +import java.util.function.Supplier; -/** An iterator-type object that transforms byte sources into a stream of Chunks. */ +/** + * Splits an {@link InputStream} into one or more {@link Chunk}s of at most {@code chunkSize} bytes. + */ public final class Chunker { - // This is effectively final, should be changed only in unit-tests! + + private static final Chunk EMPTY_CHUNK = + new Chunk(Digests.computeDigest(new byte[0]), ByteString.EMPTY, 0); + private static int defaultChunkSize = 1024 * 16; - private static final byte[] EMPTY_BLOB = new byte[0]; + /** This method must only be called in tests! */ @VisibleForTesting static void setDefaultChunkSizeForTesting(int value) { defaultChunkSize = value; } - public static int getDefaultChunkSize() { + static int getDefaultChunkSize() { return defaultChunkSize; } @@ -58,16 +57,26 @@ public final class Chunker { private final Digest digest; private final long offset; - // TODO(olaola): consider saving data in a different format that byte[]. - private final byte[] data; + private final ByteString data; - @VisibleForTesting - Chunk(Digest digest, byte[] data, long offset) { + private Chunk(Digest digest, ByteString data, long offset) { this.digest = digest; this.data = data; this.offset = offset; } + public Digest getDigest() { + return digest; + } + + public long getOffset() { + return offset; + } + + public ByteString getData() { + return data; + } + @Override public boolean equals(Object o) { if (o == this) { @@ -79,280 +88,154 @@ public final class Chunker { Chunk other = (Chunk) o; return other.offset == offset && other.digest.equals(digest) - && Arrays.equals(other.data, data); + && other.data.equals(data); } @Override public int hashCode() { - return Objects.hash(digest, offset, Arrays.hashCode(data)); - } - - public Digest getDigest() { - return digest; - } - - public long getOffset() { - return offset; - } - - // This returns a mutable copy, for efficiency. - public byte[] getData() { - return data; + return Objects.hash(digest, offset, data); } } - /** An Item is an opaque digestable source of bytes. */ - interface Item { - Digest getDigest() throws IOException; - - InputStream getInputStream() throws IOException; - } - - private final Iterator<Item> inputIterator; - private InputStream currentStream; - private Digest digest; - private long bytesLeft; + private final Supplier<InputStream> dataSupplier; + private final Digest digest; private final int chunkSize; - Chunker(Iterator<Item> inputIterator, int chunkSize) throws IOException { - Preconditions.checkArgument(chunkSize > 0, "Chunk size must be greater than 0"); - this.inputIterator = inputIterator; - this.chunkSize = chunkSize; - advanceInput(); - } - - Chunker(Item input, int chunkSize) throws IOException { - this(Iterators.singletonIterator(input), chunkSize); - } - - public void advanceInput() throws IOException { - if (inputIterator.hasNext()) { - Item input = inputIterator.next(); - digest = input.getDigest(); - currentStream = input.getInputStream(); - bytesLeft = digest.getSizeBytes(); - } else { - digest = null; - currentStream = null; - bytesLeft = 0; - } - } + private InputStream data; + private long offset; + private byte[] chunkCache; - /** True if the object has more {@link Chunk} elements. */ - public boolean hasNext() { - return currentStream != null; + public Chunker(byte[] data) throws IOException { + this(data, getDefaultChunkSize()); } - /** Consume the next Chunk element. */ - public Chunk next() throws IOException { - if (!hasNext()) { - throw new NoSuchElementException(); - } - long offset = digest.getSizeBytes() - bytesLeft; - byte[] blob = EMPTY_BLOB; - if (bytesLeft > 0) { - blob = new byte[(int) Math.min(bytesLeft, chunkSize)]; - currentStream.read(blob); - bytesLeft -= blob.length; - } - Chunk result = new Chunk(digest, blob, offset); - if (bytesLeft == 0) { - currentStream.close(); - advanceInput(); // Sets the current stream to null, if it was the last. - } - return result; + public Chunker(byte[] data, int chunkSize) throws IOException { + this(() -> new ByteArrayInputStream(data), Digests.computeDigest(data), chunkSize); } - private static Item toItem(final byte[] blob) { - return new Item() { - Digest digest = null; - - @Override - public Digest getDigest() throws IOException { - if (digest == null) { - digest = Digests.computeDigest(blob); - } - return digest; - } - - @Override - public InputStream getInputStream() throws IOException { - return new ByteArrayInputStream(blob); - } - }; + public Chunker(Path file) throws IOException { + this(file, getDefaultChunkSize()); } - private static Item toItem(final Path file) { - return new Item() { - Digest digest = null; - - @Override - public Digest getDigest() throws IOException { - if (digest == null) { - digest = Digests.computeDigest(file); - } - return digest; - } - - @Override - public InputStream getInputStream() throws IOException { + public Chunker(Path file, int chunkSize) throws IOException { + this(() -> { + try { return file.getInputStream(); + } catch (IOException e) { + throw new RuntimeException(e); } - }; - } - - private static Item toItem( - final ActionInput input, final ActionInputFileCache inputCache, final Path execRoot) { - if (input instanceof VirtualActionInput) { - return toItem((VirtualActionInput) input); - } - return new Item() { - @Override - public Digest getDigest() throws IOException { - return Digests.getDigestFromInputCache(input, inputCache); - } - - @Override - public InputStream getInputStream() throws IOException { - return execRoot.getRelative(input.getExecPathString()).getInputStream(); - } - }; + }, Digests.computeDigest(file), chunkSize); } - private static Item toItem(final VirtualActionInput input) { - return new Item() { - Digest digest = null; - - @Override - public Digest getDigest() throws IOException { - if (digest == null) { - digest = Digests.computeDigest(input); - } - return digest; - } - - @Override - public InputStream getInputStream() throws IOException { - ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - input.writeTo(buffer); - return new ByteArrayInputStream(buffer.toByteArray()); - } - }; + public Chunker(ActionInput actionInput, ActionInputFileCache inputCache, Path execRoot) throws + IOException{ + this(actionInput, inputCache, execRoot, getDefaultChunkSize()); } - private static class MemberOf implements Predicate<Item> { - private final Set<Digest> digests; - - public MemberOf(Set<Digest> digests) { - this.digests = digests; - } - - @Override - public boolean apply(Item item) { + public Chunker(ActionInput actionInput, ActionInputFileCache inputCache, Path execRoot, + int chunkSize) + throws IOException { + this(() -> { try { - return digests.contains(item.getDigest()); + return execRoot.getRelative(actionInput.getExecPathString()).getInputStream(); } catch (IOException e) { throw new RuntimeException(e); } - } + }, Digests.getDigestFromInputCache(actionInput, inputCache), chunkSize); } - /** - * Creates a Chunker from a single input source. - * - * <p>As we phase out usages of multiple input sources, this will soon completely replace the - * multiple inputs Builder. - */ - public static final class SingleSourceBuilder { - private Item item; - private int chunkSize = getDefaultChunkSize(); - - public SingleSourceBuilder chunkSize(int chunkSize) { - checkArgument(chunkSize > 0, "chunkSize must be gt 0."); - this.chunkSize = chunkSize; - return this; - } - - public SingleSourceBuilder input(byte[] blob) { - item = toItem(blob); - return this; - } + private Chunker(Supplier<InputStream> dataSupplier, Digest digest, int chunkSize) + throws IOException { + this.dataSupplier = checkNotNull(dataSupplier); + this.digest = checkNotNull(digest); + this.chunkSize = chunkSize; + reset(); + } - public SingleSourceBuilder input(Path file) { - item = toItem(file); - return this; - } + public Digest digest() { + return digest; + } - public SingleSourceBuilder input(ActionInput input, ActionInputFileCache inputCache, - Path execRoot) { - item = toItem(input, inputCache, execRoot); - return this; + /** + * Reset the {@link Chunker} state to when it was newly constructed. + */ + public void reset() throws IOException { + if (data != null) { + data.close(); } - - public Digest getDigest() throws IOException { - checkState(item != null, "Need to specify an input source first."); - return item.getDigest(); + try { + data = dataSupplier.get(); + } catch (RuntimeException e) { + Throwables.propagateIfPossible(e.getCause(), IOException.class); + throw e; } + offset = 0; + chunkCache = null; + } - public Chunker build() throws IOException { - checkState(item != null, "No input source provided."); - return new Chunker(item, chunkSize); - } + /** + * Returns {@code true} if a subsequent call to {@link #next()} returns a {@link Chunk} object; + */ + public boolean hasNext() { + return data != null; } /** - * Create a Chunker from multiple input sources. The order of the sources provided to the Builder - * will be the same order they will be chunked by. + * Returns the next {@link Chunk} or throws a {@link NoSuchElementException} if no data is left. + * + * <p>Always call {@link #hasNext()} before calling this method. + * + * <p>Zero byte inputs are treated special. Instead of throwing a {@link NoSuchElementException} + * on the first call to {@link #next()}, a {@link Chunk} with an empty {@link ByteString} is + * returned. */ - @Deprecated - public static final class Builder { - private final ImmutableList.Builder<Item> items = ImmutableList.builder(); - private Set<Digest> digests = null; - private int chunkSize = getDefaultChunkSize(); - - public Chunker build() throws IOException { - return new Chunker( - digests == null - ? items.build().iterator() - : Iterators.filter(items.build().iterator(), new MemberOf(digests)), - chunkSize); + public Chunk next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException(); } - public Builder chunkSize(int chunkSize) { - this.chunkSize = chunkSize; - return this; + if (digest.getSizeBytes() == 0) { + data = null; + return EMPTY_CHUNK; } - /** - * Restricts the Chunker to use only inputs with these digests. This is an optimization for CAS - * uploads where a list of digests missing from the CAS is known. - */ - public Builder onlyUseDigests(Set<Digest> digests) { - this.digests = digests; - return this; + // The cast to int is safe, because the return value is capped at chunkSize. + int bytesToRead = (int) Math.min(bytesLeft(), chunkSize); + if (bytesToRead == 0) { + chunkCache = null; + data = null; + throw new NoSuchElementException(); } - public Builder addInput(byte[] blob) { - items.add(toItem(blob)); - return this; + if (chunkCache == null) { + // Lazily allocate it in order to save memory on small data. + // 1) bytesToRead < chunkSize: There will only ever be one next() call. + // 2) bytesToRead == chunkSize: chunkCache will be set to its biggest possible value. + // 3) bytestoRead > chunkSize: Not possible, due to Math.min above. + chunkCache = new byte[bytesToRead]; } - public Builder addInput(Path file) { - items.add(toItem(file)); - return this; + long offsetBefore = offset; + try { + ByteStreams.readFully(data, chunkCache, 0, bytesToRead); + } catch (EOFException e) { + throw new IllegalStateException("Reached EOF, but expected " + + bytesToRead + " bytes.", e); } + offset += bytesToRead; - public Builder addInput(ActionInput input, ActionInputFileCache inputCache, Path execRoot) { - items.add(toItem(input, inputCache, execRoot)); - return this; - } + ByteString blob = ByteString.copyFrom(chunkCache, 0, bytesToRead); - public Builder addAllInputs( - Collection<? extends ActionInput> inputs, ActionInputFileCache inputCache, Path execRoot) { - for (ActionInput input : inputs) { - items.add(toItem(input, inputCache, execRoot)); - } - return this; + if (bytesLeft() == 0) { + data.close(); + data = null; + chunkCache = null; } + + return new Chunk(digest, blob, offsetBefore); + } + + private long bytesLeft() { + return digest.getSizeBytes() - offset; } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java index 615e9f83d7..5b21e249b5 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java @@ -27,7 +27,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputFileCache; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import com.google.devtools.build.lib.remote.Chunker.SingleSourceBuilder; import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.util.Preconditions; @@ -172,14 +171,13 @@ public class GrpcRemoteCache implements RemoteActionCache { } uploadBlob(command.toByteArray()); if (!actionInputs.isEmpty()) { - List<Chunker.SingleSourceBuilder> inputsToUpload = new ArrayList<>(); + List<Chunker> inputsToUpload = new ArrayList<>(); ActionInputFileCache inputFileCache = repository.getInputFileCache(); for (ActionInput actionInput : actionInputs) { Digest digest = Digests.getDigestFromInputCache(actionInput, inputFileCache); if (missingDigests.contains(digest)) { - Chunker.SingleSourceBuilder builder = - new Chunker.SingleSourceBuilder().input(actionInput, inputFileCache, execRoot); - inputsToUpload.add(builder); + Chunker chunker = new Chunker(actionInput, inputFileCache, execRoot); + inputsToUpload.add(chunker); } } @@ -323,7 +321,7 @@ public class GrpcRemoteCache implements RemoteActionCache { throws IOException, InterruptedException { ArrayList<Digest> digests = new ArrayList<>(); ImmutableSet<Digest> digestsToUpload = getMissingDigests(digests); - List<Chunker.SingleSourceBuilder> filesToUpload = new ArrayList<>(digestsToUpload.size()); + List<Chunker> filesToUpload = new ArrayList<>(digestsToUpload.size()); for (Path file : files) { if (!file.exists()) { // We ignore requested results that have not been generated by the action. @@ -339,8 +337,8 @@ public class GrpcRemoteCache implements RemoteActionCache { digests.add(digest); if (digestsToUpload.contains(digest)) { - Chunker.SingleSourceBuilder chunkerBuilder = new SingleSourceBuilder().input(file); - filesToUpload.add(chunkerBuilder); + Chunker chunker = new Chunker(file); + filesToUpload.add(chunker); } } @@ -379,7 +377,7 @@ public class GrpcRemoteCache implements RemoteActionCache { Digest digest = Digests.computeDigest(file); ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest)); if (!missing.isEmpty()) { - uploader.uploadBlob(new Chunker.SingleSourceBuilder().input(file)); + uploader.uploadBlob(new Chunker(file)); } return digest; } @@ -395,7 +393,7 @@ public class GrpcRemoteCache implements RemoteActionCache { Digest digest = Digests.getDigestFromInputCache(input, inputCache); ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest)); if (!missing.isEmpty()) { - uploader.uploadBlob(new Chunker.SingleSourceBuilder().input(input, inputCache, execRoot)); + uploader.uploadBlob(new Chunker(input, inputCache, execRoot)); } return digest; } @@ -404,7 +402,7 @@ public class GrpcRemoteCache implements RemoteActionCache { Digest digest = Digests.computeDigest(blob); ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest)); if (!missing.isEmpty()) { - uploader.uploadBlob(new Chunker.SingleSourceBuilder().input(blob)); + uploader.uploadBlob(new Chunker(blob)); } return digest; } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index 596b5a65e2..8cd78ff10c 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -23,7 +23,6 @@ import com.google.bytestream.ByteStreamProto.WriteResponse; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.devtools.build.lib.remote.Chunker.SingleSourceBuilder; import com.google.protobuf.ByteString; import io.grpc.Channel; import io.grpc.Metadata; @@ -106,8 +105,7 @@ public class ByteStreamUploaderTest { byte[] blob = new byte[CHUNK_SIZE * 2 + 1]; new Random().nextBytes(blob); - Chunker.SingleSourceBuilder builder = - new SingleSourceBuilder().chunkSize(CHUNK_SIZE).input(blob); + Chunker chunker = new Chunker(blob, CHUNK_SIZE); serviceRegistry.addService(new ByteStreamImplBase() { @Override @@ -158,7 +156,7 @@ public class ByteStreamUploaderTest { } }); - uploader.uploadBlob(builder); + uploader.uploadBlob(chunker); // This test should not have triggered any retries. Mockito.verifyZeroInteractions(mockBackoff); @@ -174,16 +172,15 @@ public class ByteStreamUploaderTest { int numUploads = 100; Map<String, byte[]> blobsByHash = new HashMap<>(); - List<Chunker.SingleSourceBuilder> builders = new ArrayList<>(numUploads); + List<Chunker> builders = new ArrayList<>(numUploads); Random rand = new Random(); for (int i = 0; i < numUploads; i++) { int blobSize = rand.nextInt(CHUNK_SIZE * 10) + CHUNK_SIZE; byte[] blob = new byte[blobSize]; rand.nextBytes(blob); - Chunker.SingleSourceBuilder builder = - new Chunker.SingleSourceBuilder().chunkSize(CHUNK_SIZE).input(blob); - builders.add(builder); - blobsByHash.put(builder.getDigest().getHash(), blob); + Chunker chunker = new Chunker(blob, CHUNK_SIZE); + builders.add(chunker); + blobsByHash.put(chunker.digest().getHash(), blob); } Set<String> uploadsFailedOnce = Collections.synchronizedSet(new HashSet<>()); @@ -263,8 +260,7 @@ public class ByteStreamUploaderTest { new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService); byte[] blob = new byte[CHUNK_SIZE * 10]; - Chunker.SingleSourceBuilder builder = - new Chunker.SingleSourceBuilder().chunkSize(CHUNK_SIZE).input(blob); + Chunker chunker = new Chunker(blob, CHUNK_SIZE); AtomicInteger numWriteCalls = new AtomicInteger(); @@ -296,8 +292,8 @@ public class ByteStreamUploaderTest { } }); - Future<?> upload1 = uploader.uploadBlobAsync(builder); - Future<?> upload2 = uploader.uploadBlobAsync(builder); + Future<?> upload1 = uploader.uploadBlobAsync(chunker); + Future<?> upload2 = uploader.uploadBlobAsync(chunker); assertThat(upload1).isSameAs(upload2); @@ -313,8 +309,7 @@ public class ByteStreamUploaderTest { new ByteStreamUploader(INSTANCE_NAME, channel, null, 3, retrier, retryService); byte[] blob = new byte[CHUNK_SIZE]; - Chunker.SingleSourceBuilder builder = - new Chunker.SingleSourceBuilder().chunkSize(CHUNK_SIZE).input(blob); + Chunker chunker = new Chunker(blob, CHUNK_SIZE); serviceRegistry.addService(new ByteStreamImplBase() { @Override @@ -325,7 +320,7 @@ public class ByteStreamUploaderTest { }); try { - uploader.uploadBlob(builder); + uploader.uploadBlob(chunker); fail("Should have thrown an exception."); } catch (RetryException e) { assertThat(e.getAttempts()).isEqualTo(2); @@ -364,15 +359,13 @@ public class ByteStreamUploaderTest { serviceRegistry.addService(service); byte[] blob1 = new byte[CHUNK_SIZE]; - Chunker.SingleSourceBuilder builder1 = - new Chunker.SingleSourceBuilder().chunkSize(CHUNK_SIZE).input(blob1); + Chunker chunker1 = new Chunker(blob1, CHUNK_SIZE); byte[] blob2 = new byte[CHUNK_SIZE + 1]; - Chunker.SingleSourceBuilder builder2 = - new Chunker.SingleSourceBuilder().chunkSize(CHUNK_SIZE).input(blob2); + Chunker chunker2 = new Chunker(blob2, CHUNK_SIZE); - ListenableFuture<Void> f1 = uploader.uploadBlobAsync(builder1); - ListenableFuture<Void> f2 = uploader.uploadBlobAsync(builder2); + ListenableFuture<Void> f1 = uploader.uploadBlobAsync(chunker1); + ListenableFuture<Void> f2 = uploader.uploadBlobAsync(chunker2); assertThat(uploader.uploadsInProgress()).isTrue(); @@ -407,9 +400,9 @@ public class ByteStreamUploaderTest { assertThat(retryService.isShutdown()).isTrue(); byte[] blob = new byte[1]; - Chunker.SingleSourceBuilder builder = new Chunker.SingleSourceBuilder().input(blob); + Chunker chunker = new Chunker(blob, CHUNK_SIZE); try { - uploader.uploadBlob(builder); + uploader.uploadBlob(chunker); fail("Should have thrown an exception."); } catch (RetryException e) { assertThat(e).hasCauseThat().isInstanceOf(RejectedExecutionException.class); @@ -447,9 +440,9 @@ public class ByteStreamUploaderTest { }); byte[] blob = new byte[1]; - Chunker.SingleSourceBuilder builder = new Chunker.SingleSourceBuilder().input(blob); + Chunker chunker = new Chunker(blob, CHUNK_SIZE); - uploader.uploadBlob(builder); + uploader.uploadBlob(chunker); } @Test(timeout = 10000) @@ -471,10 +464,10 @@ public class ByteStreamUploaderTest { }); byte[] blob = new byte[1]; - Chunker.SingleSourceBuilder builder = new Chunker.SingleSourceBuilder().input(blob); + Chunker chunker = new Chunker(blob, CHUNK_SIZE); try { - uploader.uploadBlob(builder); + uploader.uploadBlob(chunker); fail("Should have thrown an exception."); } catch (RetryException e) { assertThat(numCalls.get()).isEqualTo(1); diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java index b51b9d6259..110dd6dd8d 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java @@ -14,11 +14,15 @@ package com.google.devtools.build.lib.remote; import static com.google.common.truth.Truth.assertThat; -import static java.nio.charset.StandardCharsets.UTF_8; +import static junit.framework.TestCase.fail; -import com.google.common.collect.ImmutableSet; import com.google.devtools.build.lib.remote.Chunker.Chunk; import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.protobuf.ByteString; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.NoSuchElementException; +import java.util.Random; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -28,54 +32,105 @@ import org.junit.runners.JUnit4; public class ChunkerTest { @Test - public void testChunker() throws Exception { - byte[] b1 = "abcdefg".getBytes(UTF_8); - byte[] b2 = "hij".getBytes(UTF_8); - byte[] b3 = "klmnopqrstuvwxyz".getBytes(UTF_8); - Digest d1 = Digests.computeDigest(b1); - Digest d2 = Digests.computeDigest(b2); - Digest d3 = Digests.computeDigest(b3); - Chunker c = new Chunker.Builder().chunkSize(5).addInput(b1).addInput(b2).addInput(b3).build(); - assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(new Chunk(d1, "abcde".getBytes(UTF_8), 0)); - assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(new Chunk(d1, "fg".getBytes(UTF_8), 5)); - assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(new Chunk(d2, "hij".getBytes(UTF_8), 0)); - assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(new Chunk(d3, "klmno".getBytes(UTF_8), 0)); - assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(new Chunk(d3, "pqrst".getBytes(UTF_8), 5)); - assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(new Chunk(d3, "uvwxy".getBytes(UTF_8), 10)); - assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(new Chunk(d3, "z".getBytes(UTF_8), 15)); - assertThat(c.hasNext()).isFalse(); + public void chunkingShouldWork() throws IOException { + Random rand = new Random(); + byte[] expectedData = new byte[21]; + rand.nextBytes(expectedData); + Digest expectedDigest = Digests.computeDigest(expectedData); + + Chunker chunker = new Chunker(expectedData, 10); + + ByteArrayOutputStream actualData = new ByteArrayOutputStream(); + + assertThat(chunker.hasNext()).isTrue(); + Chunk next = chunker.next(); + assertThat(next.getOffset()).isEqualTo(0); + assertThat(next.getData()).hasSize(10); + assertThat(next.getDigest()).isEqualTo(expectedDigest); + next.getData().writeTo(actualData); + + assertThat(chunker.hasNext()).isTrue(); + next = chunker.next(); + assertThat(next.getOffset()).isEqualTo(10); + assertThat(next.getData()).hasSize(10); + assertThat(next.getDigest()).isEqualTo(expectedDigest); + next.getData().writeTo(actualData); + + assertThat(chunker.hasNext()).isTrue(); + next = chunker.next(); + assertThat(next.getOffset()).isEqualTo(20); + assertThat(next.getData()).hasSize(1); + assertThat(next.getDigest()).isEqualTo(expectedDigest); + next.getData().writeTo(actualData); + + assertThat(chunker.hasNext()).isFalse(); + + assertThat(expectedData).isEqualTo(actualData.toByteArray()); + } + + @Test + public void nextShouldThrowIfNoMoreData() throws IOException { + byte[] data = new byte[10]; + Chunker chunker = new Chunker(data, 10); + + assertThat(chunker.hasNext()).isTrue(); + assertThat(chunker.next()).isNotNull(); + + assertThat(chunker.hasNext()).isFalse(); + + try { + chunker.next(); + fail("Should have thrown an exception"); + } catch (NoSuchElementException expected) { + // Intentionally left empty. + } + } + + @Test + public void emptyData() throws Exception { + byte[] data = new byte[0]; + Chunker chunker = new Chunker(data); + + assertThat(chunker.hasNext()).isTrue(); + + Chunk next = chunker.next(); + + assertThat(next).isNotNull(); + assertThat(next.getData()).isEmpty(); + assertThat(next.getOffset()).isEqualTo(0); + + assertThat(chunker.hasNext()).isFalse(); + + try { + chunker.next(); + fail("Should have thrown an exception"); + } catch (NoSuchElementException expected) { + // Intentionally left empty. + } } @Test - public void testIgnoresUnmentionedDigests() throws Exception { - byte[] b1 = "a".getBytes(UTF_8); - byte[] b2 = "bb".getBytes(UTF_8); - byte[] b3 = "ccc".getBytes(UTF_8); - byte[] b4 = "dddd".getBytes(UTF_8); - Digest d1 = Digests.computeDigest(b1); - Digest d3 = Digests.computeDigest(b3); - Chunker c = - new Chunker.Builder() - .chunkSize(2) - .onlyUseDigests(ImmutableSet.of(d1, d3)) - .addInput(b1) - .addInput(b2) - .addInput(b3) - .addInput(b4) - .build(); - assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(new Chunk(d1, "a".getBytes(UTF_8), 0)); - assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(new Chunk(d3, "cc".getBytes(UTF_8), 0)); - assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(new Chunk(d3, "c".getBytes(UTF_8), 2)); - assertThat(c.hasNext()).isFalse(); + public void reset() throws Exception { + byte[] data = new byte[]{1, 2, 3}; + Chunker chunker = new Chunker(data, 1); + + assertNextEquals(chunker, (byte) 1); + assertNextEquals(chunker, (byte) 2); + + chunker.reset(); + + assertNextEquals(chunker, (byte) 1); + assertNextEquals(chunker, (byte) 2); + assertNextEquals(chunker, (byte) 3); + + chunker.reset(); + + assertNextEquals(chunker, (byte) 1); + } + + private void assertNextEquals(Chunker chunker, byte... data) throws IOException { + assertThat(chunker.hasNext()).isTrue(); + ByteString next = chunker.next().getData(); + assertThat(next.toByteArray()).isEqualTo(data); } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java index 9d65503b48..7b1f359b91 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteCacheTest.java @@ -291,11 +291,7 @@ public class GrpcRemoteCacheTest { this.responseObserver = responseObserver; this.contents = contents; try { - chunker = - new Chunker.Builder() - .chunkSize(chunkSizeBytes) - .addInput(contents.getBytes(UTF_8)) - .build(); + chunker = new Chunker(contents.getBytes(UTF_8), chunkSizeBytes); } catch (IOException e) { fail("An error occurred:" + e); } @@ -308,15 +304,15 @@ public class GrpcRemoteCacheTest { Chunker.Chunk chunk = chunker.next(); Digest digest = chunk.getDigest(); long offset = chunk.getOffset(); - byte[] data = chunk.getData(); + ByteString data = chunk.getData(); if (offset == 0) { assertThat(request.getResourceName()).contains(digest.getHash()); } else { assertThat(request.getResourceName()).isEmpty(); } assertThat(request.getFinishWrite()) - .isEqualTo(offset + data.length == digest.getSizeBytes()); - assertThat(request.getData().toByteArray()).isEqualTo(data); + .isEqualTo(offset + data.size() == digest.getSizeBytes()); + assertThat(request.getData()).isEqualTo(data); } catch (IOException e) { fail("An error occurred:" + e); } diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java index 2c6a1fb343..acc6685bf2 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/ByteStreamServer.java @@ -29,7 +29,6 @@ import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache; import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.remoteexecution.v1test.Digest; -import com.google.protobuf.ByteString; import com.google.rpc.Code; import com.google.rpc.Status; import io.grpc.protobuf.StatusProto; @@ -80,10 +79,10 @@ final class ByteStreamServer extends ByteStreamImplBase { try { // This still relies on the blob size to be small enough to fit in memory. // TODO(olaola): refactor to fix this if the need arises. - Chunker c = new Chunker.Builder().addInput(cache.downloadBlob(digest)).build(); + Chunker c = new Chunker(cache.downloadBlob(digest)); while (c.hasNext()) { responseObserver.onNext( - ReadResponse.newBuilder().setData(ByteString.copyFrom(c.next().getData())).build()); + ReadResponse.newBuilder().setData(c.next().getData()).build()); } responseObserver.onCompleted(); } catch (CacheNotFoundException e) { |