diff options
3 files changed, 337 insertions, 223 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java new file mode 100644 index 0000000000..afccdc68f4 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -0,0 +1,231 @@ +// Copyright 2016 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.remote; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterators; +import com.google.devtools.build.lib.actions.ActionInput; +import com.google.devtools.build.lib.actions.ActionInputFileCache; +import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk; +import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; +import com.google.devtools.build.lib.util.Preconditions; +import com.google.devtools.build.lib.vfs.Path; +import com.google.protobuf.ByteString; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; +import javax.annotation.Nullable; + +/** An iterator-type object that transforms byte sources into a stream of BlobChunk messages. */ +public final class Chunker { + /** An Item is an opaque digestable source of bytes. */ + interface Item { + ContentDigest getDigest() throws IOException; + + InputStream getInputStream() throws IOException; + } + + private final Iterator<Item> inputIterator; + private InputStream currentStream; + private final Set<ContentDigest> digests; + private ContentDigest digest; + private long bytesLeft; + private final int chunkSize; + + Chunker( + Iterator<Item> inputIterator, + int chunkSize, + // If present, specifies which digests to output out of the whole input. + @Nullable Set<ContentDigest> digests) + throws IOException { + Preconditions.checkArgument(chunkSize > 0, "Chunk size must be greater than 0"); + this.digests = digests; + this.inputIterator = inputIterator; + this.chunkSize = chunkSize; + advanceInput(); + } + + Chunker(Iterator<Item> inputIterator, int chunkSize) throws IOException { + this(inputIterator, chunkSize, null); + } + + Chunker(Item input, int chunkSize) throws IOException { + this(Iterators.singletonIterator(input), chunkSize, ImmutableSet.of(input.getDigest())); + } + + private void advanceInput() throws IOException { + do { + if (inputIterator != null && inputIterator.hasNext()) { + Item input = inputIterator.next(); + digest = input.getDigest(); + currentStream = input.getInputStream(); + bytesLeft = digest.getSizeBytes(); + } else { + digest = null; + currentStream = null; + bytesLeft = 0; + } + } while (digest != null && digests != null && !digests.contains(digest)); + } + + /** True if the object has more BlobChunk elements. */ + public boolean hasNext() { + return currentStream != null; + } + + /** Consume the next BlobChunk element. */ + public BlobChunk next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException(); + } + BlobChunk.Builder chunk = BlobChunk.newBuilder(); + long offset = digest.getSizeBytes() - bytesLeft; + if (offset == 0) { + chunk.setDigest(digest); + } else { + chunk.setOffset(offset); + } + if (bytesLeft > 0) { + byte[] blob = new byte[(int) Math.min(bytesLeft, (long) chunkSize)]; + currentStream.read(blob); + chunk.setData(ByteString.copyFrom(blob)); + bytesLeft -= blob.length; + } + if (bytesLeft == 0) { + currentStream.close(); + advanceInput(); // Sets the current stream to null, if it was the last. + } + return chunk.build(); + } + + static Item toItem(final byte[] blob) { + return new Item() { + @Override + public ContentDigest getDigest() throws IOException { + return ContentDigests.computeDigest(blob); + } + + @Override + public InputStream getInputStream() throws IOException { + return new ByteArrayInputStream(blob); + } + }; + } + + static Item toItem(final Path file) { + return new Item() { + @Override + public ContentDigest getDigest() throws IOException { + return ContentDigests.computeDigest(file); + } + + @Override + public InputStream getInputStream() throws IOException { + return file.getInputStream(); + } + }; + } + + static Item toItem( + final ActionInput input, final ActionInputFileCache inputCache, final Path execRoot) { + return new Item() { + @Override + public ContentDigest getDigest() throws IOException { + return ContentDigests.getDigestFromInputCache(input, inputCache); + } + + @Override + public InputStream getInputStream() throws IOException { + return execRoot.getRelative(input.getExecPathString()).getInputStream(); + } + }; + } + + /** + * Create a Chunker from a given ActionInput, taking its digest from the provided + * ActionInputFileCache. + */ + public static Chunker from( + ActionInput input, int chunkSize, ActionInputFileCache inputCache, Path execRoot) + throws IOException { + return new Chunker(toItem(input, inputCache, execRoot), chunkSize); + } + + /** Create a Chunker from a given blob and chunkSize. */ + public static Chunker from(byte[] blob, int chunkSize) throws IOException { + return new Chunker(toItem(blob), chunkSize); + } + + /** Create a Chunker from a given Path and chunkSize. */ + public static Chunker from(Path file, int chunkSize) throws IOException { + return new Chunker(toItem(file), chunkSize); + } + + /** + * Create a Chunker from multiple input sources. The order of the sources provided to the Builder + * will be the same order they will be chunked by. + */ + public static final class Builder { + private final ArrayList<Item> items = new ArrayList<>(); + private Set<ContentDigest> digests = null; + private int chunkSize = 0; + + public Chunker build() throws IOException { + return new Chunker(items.iterator(), chunkSize, digests); + } + + public Builder chunkSize(int chunkSize) { + this.chunkSize = chunkSize; + return this; + } + + /** + * Restricts the Chunker to use only inputs with these digests. This is an optimization for CAS + * uploads where a list of digests missing from the CAS is known. + */ + public Builder onlyUseDigests(Set<ContentDigest> digests) { + this.digests = digests; + return this; + } + + public Builder addInput(byte[] blob) { + items.add(toItem(blob)); + return this; + } + + public Builder addInput(Path file) { + items.add(toItem(file)); + return this; + } + + public Builder addInput(ActionInput input, ActionInputFileCache inputCache, Path execRoot) { + items.add(toItem(input, inputCache, execRoot)); + return this; + } + + public Builder addAllInputs( + Collection<? extends ActionInput> inputs, ActionInputFileCache inputCache, Path execRoot) { + for (ActionInput input : inputs) { + items.add(toItem(input, inputCache, execRoot)); + } + return this; + } + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java index 2d493e12ea..56d4fa2e18 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java @@ -17,7 +17,6 @@ package com.google.devtools.build.lib.remote; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterators; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputFileCache; import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException; @@ -54,7 +53,6 @@ import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.Collection; @@ -62,7 +60,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -75,221 +72,10 @@ public class GrpcActionCache implements RemoteActionCache { /** Channel over which to send gRPC CAS queries. */ private final GrpcCasInterface casIface; + private final GrpcExecutionCacheInterface iface; private final RemoteOptions options; - /** Reads from multiple sequential inputs and chunks the data into BlobChunks. */ - static interface BlobChunkIterator { - boolean hasNext(); - - BlobChunk next() throws IOException; // IOException can be a result of file read. - } - - final class BlobChunkInlineIterator implements BlobChunkIterator { - private final Iterator<byte[]> blobIterator; - private final Set<ContentDigest> digests; - private int offset; - private ContentDigest digest; - private byte[] currentBlob; - - public BlobChunkInlineIterator(Set<ContentDigest> digests, Iterator<byte[]> blobIterator) { - this.digests = digests; - this.blobIterator = blobIterator; - advanceInput(); - } - - public BlobChunkInlineIterator(byte[] blob) { - blobIterator = null; - offset = 0; - currentBlob = blob; - digest = ContentDigests.computeDigest(currentBlob); - digests = null; - } - - private void advanceInput() { - offset = 0; - do { - if (blobIterator != null && blobIterator.hasNext()) { - currentBlob = blobIterator.next(); - digest = ContentDigests.computeDigest(currentBlob); - } else { - currentBlob = null; - digest = null; - } - } while (digest != null && !digests.contains(digest)); - } - - @Override - public boolean hasNext() { - return currentBlob != null; - } - - @Override - public BlobChunk next() throws IOException { - if (!hasNext()) { - throw new NoSuchElementException(); - } - BlobChunk.Builder chunk = BlobChunk.newBuilder(); - if (offset == 0) { - chunk.setDigest(digest); - } else { - chunk.setOffset(offset); - } - int size = Math.min(currentBlob.length - offset, options.grpcMaxChunkSizeBytes); - if (size > 0) { - chunk.setData(ByteString.copyFrom(currentBlob, offset, size)); - offset += size; - } - if (offset >= currentBlob.length) { - advanceInput(); - } - return chunk.build(); - } - } - - final class BlobChunkFileIterator implements BlobChunkIterator { - private final Iterator<Path> fileIterator; - private InputStream currentStream; - private final Set<ContentDigest> digests; - private ContentDigest digest; - private long bytesLeft; - - public BlobChunkFileIterator(Set<ContentDigest> digests, Iterator<Path> fileIterator) - throws IOException { - this.digests = digests; - this.fileIterator = fileIterator; - advanceInput(); - } - - public BlobChunkFileIterator(Path file) throws IOException { - fileIterator = Iterators.singletonIterator(file); - digests = ImmutableSet.of(ContentDigests.computeDigest(file)); - advanceInput(); - } - - private void advanceInput() throws IOException { - do { - if (fileIterator != null && fileIterator.hasNext()) { - Path file = fileIterator.next(); - digest = ContentDigests.computeDigest(file); - currentStream = file.getInputStream(); - bytesLeft = digest.getSizeBytes(); - } else { - digest = null; - currentStream = null; - bytesLeft = 0; - } - } while (digest != null && !digests.contains(digest)); - } - - @Override - public boolean hasNext() { - return currentStream != null; - } - - @Override - public BlobChunk next() throws IOException { - if (!hasNext()) { - throw new NoSuchElementException(); - } - BlobChunk.Builder chunk = BlobChunk.newBuilder(); - long offset = digest.getSizeBytes() - bytesLeft; - if (offset == 0) { - chunk.setDigest(digest); - } else { - chunk.setOffset(offset); - } - if (bytesLeft > 0) { - byte[] blob = new byte[(int) Math.min(bytesLeft, options.grpcMaxChunkSizeBytes)]; - currentStream.read(blob); - chunk.setData(ByteString.copyFrom(blob)); - bytesLeft -= blob.length; - } - if (bytesLeft == 0) { - currentStream.close(); - advanceInput(); - } - return chunk.build(); - } - } - - final class BlobChunkActionInputIterator implements BlobChunkIterator { - private final Iterator<? extends ActionInput> inputIterator; - private final ActionInputFileCache inputCache; - private final Path execRoot; - private InputStream currentStream; - private final Set<ContentDigest> digests; - private ContentDigest digest; - private long bytesLeft; - - public BlobChunkActionInputIterator( - Set<ContentDigest> digests, - Path execRoot, - Iterator<? extends ActionInput> inputIterator, - ActionInputFileCache inputCache) - throws IOException { - this.digests = digests; - this.inputIterator = inputIterator; - this.inputCache = inputCache; - this.execRoot = execRoot; - advanceInput(); - } - - public BlobChunkActionInputIterator( - ActionInput input, Path execRoot, ActionInputFileCache inputCache) throws IOException { - inputIterator = Iterators.singletonIterator(input); - digests = ImmutableSet.of(ContentDigests.getDigestFromInputCache(input, inputCache)); - this.inputCache = inputCache; - this.execRoot = execRoot; - advanceInput(); - } - - private void advanceInput() throws IOException { - do { - if (inputIterator != null && inputIterator.hasNext()) { - ActionInput input = inputIterator.next(); - digest = ContentDigests.getDigestFromInputCache(input, inputCache); - currentStream = execRoot.getRelative(input.getExecPathString()).getInputStream(); - bytesLeft = digest.getSizeBytes(); - } else { - digest = null; - currentStream = null; - bytesLeft = 0; - } - } while (digest != null && !digests.contains(digest)); - } - - @Override - public boolean hasNext() { - return currentStream != null; - } - - @Override - public BlobChunk next() throws IOException { - if (!hasNext()) { - throw new NoSuchElementException(); - } - BlobChunk.Builder chunk = BlobChunk.newBuilder(); - long offset = digest.getSizeBytes() - bytesLeft; - if (offset == 0) { - chunk.setDigest(digest); - } else { - chunk.setOffset(offset); - } - if (bytesLeft > 0) { - byte[] blob = new byte[(int) Math.min(bytesLeft, (long) options.grpcMaxChunkSizeBytes)]; - currentStream.read(blob); - chunk.setData(ByteString.copyFrom(blob)); - bytesLeft -= blob.length; - } - if (bytesLeft == 0) { - currentStream.close(); - advanceInput(); - } - return chunk.build(); - } - } - public GrpcActionCache( RemoteOptions options, GrpcCasInterface casIface, GrpcExecutionCacheInterface iface) { this.options = options; @@ -352,8 +138,11 @@ public class GrpcActionCache implements RemoteActionCache { if (!actionInputs.isEmpty()) { uploadChunks( actionInputs.size(), - new BlobChunkActionInputIterator( - missingDigests, execRoot, actionInputs.iterator(), repository.getInputFileCache())); + new Chunker.Builder() + .chunkSize(options.grpcMaxChunkSizeBytes) + .addAllInputs(actionInputs, repository.getInputFileCache(), execRoot) + .onlyUseDigests(missingDigests) + .build()); } } @@ -460,12 +249,14 @@ public class GrpcActionCache implements RemoteActionCache { public void uploadAllResults(Path execRoot, Collection<Path> files, ActionResult.Builder result) throws IOException, InterruptedException { ArrayList<ContentDigest> digests = new ArrayList<>(); + Chunker.Builder b = new Chunker.Builder().chunkSize(options.grpcMaxChunkSizeBytes); for (Path file : files) { digests.add(ContentDigests.computeDigest(file)); + b.addInput(file); } ImmutableSet<ContentDigest> missing = getMissingDigests(digests); if (!missing.isEmpty()) { - uploadChunks(missing.size(), new BlobChunkFileIterator(missing, files.iterator())); + uploadChunks(missing.size(), b.onlyUseDigests(missing).build()); } int index = 0; for (Path file : files) { @@ -495,7 +286,7 @@ public class GrpcActionCache implements RemoteActionCache { ContentDigest digest = ContentDigests.computeDigest(file); ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest)); if (!missing.isEmpty()) { - uploadChunks(1, new BlobChunkFileIterator(file)); + uploadChunks(1, Chunker.from(file, options.grpcMaxChunkSizeBytes)); } return digest; } @@ -513,7 +304,7 @@ public class GrpcActionCache implements RemoteActionCache { ContentDigest digest = ContentDigests.getDigestFromInputCache(input, inputCache); ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest)); if (!missing.isEmpty()) { - uploadChunks(1, new BlobChunkActionInputIterator(input, execRoot, inputCache)); + uploadChunks(1, Chunker.from(input, options.grpcMaxChunkSizeBytes, inputCache, execRoot)); } return digest; } @@ -549,7 +340,7 @@ public class GrpcActionCache implements RemoteActionCache { } } - private void uploadChunks(int numItems, BlobChunkIterator blobs) + private void uploadChunks(int numItems, Chunker blobs) throws InterruptedException, IOException { CountDownLatch finishLatch = new CountDownLatch(numItems); // Maximal number of batches. AtomicReference<RuntimeException> exception = new AtomicReference<>(null); @@ -610,13 +401,15 @@ public class GrpcActionCache implements RemoteActionCache { public ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs) throws InterruptedException { ArrayList<ContentDigest> digests = new ArrayList<>(); + Chunker.Builder b = new Chunker.Builder().chunkSize(options.grpcMaxChunkSizeBytes); for (byte[] blob : blobs) { digests.add(ContentDigests.computeDigest(blob)); + b.addInput(blob); } ImmutableSet<ContentDigest> missing = getMissingDigests(digests); try { if (!missing.isEmpty()) { - uploadChunks(missing.size(), new BlobChunkInlineIterator(missing, blobs.iterator())); + uploadChunks(missing.size(), b.onlyUseDigests(missing).build()); } return ImmutableList.copyOf(digests); } catch (IOException e) { @@ -631,7 +424,7 @@ public class GrpcActionCache implements RemoteActionCache { ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest)); try { if (!missing.isEmpty()) { - uploadChunks(1, new BlobChunkInlineIterator(blob)); + uploadChunks(1, Chunker.from(blob, options.grpcMaxChunkSizeBytes)); } return digest; } catch (IOException e) { diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java new file mode 100644 index 0000000000..13be580204 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java @@ -0,0 +1,90 @@ +// Copyright 2015 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import static com.google.common.truth.Truth.assertThat; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.collect.ImmutableSet; +import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk; +import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; +import com.google.protobuf.ByteString; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link Chunker}. */ +@RunWith(JUnit4.class) +public class ChunkerTest { + + static BlobChunk buildChunk(long offset, String data) { + return BlobChunk.newBuilder().setOffset(offset).setData(ByteString.copyFromUtf8(data)).build(); + } + + static BlobChunk buildChunk(ContentDigest digest, String data) { + return BlobChunk.newBuilder().setDigest(digest).setData(ByteString.copyFromUtf8(data)).build(); + } + + @Test + public void testChunker() throws Exception { + byte[] b1 = "abcdefg".getBytes(UTF_8); + byte[] b2 = "hij".getBytes(UTF_8); + byte[] b3 = "klmnopqrstuvwxyz".getBytes(UTF_8); + ContentDigest d1 = ContentDigests.computeDigest(b1); + ContentDigest d2 = ContentDigests.computeDigest(b2); + ContentDigest d3 = ContentDigests.computeDigest(b3); + Chunker c = new Chunker.Builder().chunkSize(5).addInput(b1).addInput(b2).addInput(b3).build(); + assertThat(c.hasNext()).isTrue(); + assertThat(c.next()).isEqualTo(buildChunk(d1, "abcde")); + assertThat(c.hasNext()).isTrue(); + assertThat(c.next()).isEqualTo(buildChunk(5, "fg")); + assertThat(c.hasNext()).isTrue(); + assertThat(c.next()).isEqualTo(buildChunk(d2, "hij")); + assertThat(c.hasNext()).isTrue(); + assertThat(c.next()).isEqualTo(buildChunk(d3, "klmno")); + assertThat(c.hasNext()).isTrue(); + assertThat(c.next()).isEqualTo(buildChunk(5, "pqrst")); + assertThat(c.hasNext()).isTrue(); + assertThat(c.next()).isEqualTo(buildChunk(10, "uvwxy")); + assertThat(c.hasNext()).isTrue(); + assertThat(c.next()).isEqualTo(buildChunk(15, "z")); + assertThat(c.hasNext()).isFalse(); + } + + @Test + public void testIgnoresUnmentionedDigests() throws Exception { + byte[] b1 = "a".getBytes(UTF_8); + byte[] b2 = "bb".getBytes(UTF_8); + byte[] b3 = "ccc".getBytes(UTF_8); + byte[] b4 = "dddd".getBytes(UTF_8); + ContentDigest d1 = ContentDigests.computeDigest(b1); + ContentDigest d3 = ContentDigests.computeDigest(b3); + Chunker c = + new Chunker.Builder() + .chunkSize(2) + .onlyUseDigests(ImmutableSet.of(d1, d3)) + .addInput(b1) + .addInput(b2) + .addInput(b3) + .addInput(b4) + .build(); + assertThat(c.hasNext()).isTrue(); + assertThat(c.next()).isEqualTo(buildChunk(d1, "a")); + assertThat(c.hasNext()).isTrue(); + assertThat(c.next()).isEqualTo(buildChunk(d3, "cc")); + assertThat(c.hasNext()).isTrue(); + assertThat(c.next()).isEqualTo(buildChunk(2, "c")); + assertThat(c.hasNext()).isFalse(); + } +} |