diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java | 108 |
1 files changed, 101 insertions, 7 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java index 4372932928..1076933fa0 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java @@ -19,6 +19,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterators; import com.google.devtools.build.lib.actions.ActionInput; +import com.google.devtools.build.lib.actions.ActionInputFileCache; import com.google.devtools.build.lib.analysis.config.InvalidConfigurationException; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub; @@ -216,6 +217,83 @@ public final class GrpcActionCache implements RemoteActionCache { } } + final class BlobChunkActionInputIterator implements BlobChunkIterator { + private final Iterator<? extends ActionInput> inputIterator; + private final ActionInputFileCache inputCache; + private final Path execRoot; + private InputStream currentStream; + private final Set<ContentDigest> digests; + private ContentDigest digest; + private long bytesLeft; + + public BlobChunkActionInputIterator( + Set<ContentDigest> digests, + Path execRoot, + Iterator<? extends ActionInput> inputIterator, + ActionInputFileCache inputCache) + throws IOException { + this.digests = digests; + this.inputIterator = inputIterator; + this.inputCache = inputCache; + this.execRoot = execRoot; + advanceInput(); + } + + public BlobChunkActionInputIterator( + ActionInput input, Path execRoot, ActionInputFileCache inputCache) throws IOException { + inputIterator = Iterators.singletonIterator(input); + digests = ImmutableSet.of(ContentDigests.getDigestFromInputCache(input, inputCache)); + this.inputCache = inputCache; + this.execRoot = execRoot; + advanceInput(); + } + + private void advanceInput() throws IOException { + do { + if (inputIterator != null && inputIterator.hasNext()) { + ActionInput input = inputIterator.next(); + digest = ContentDigests.getDigestFromInputCache(input, inputCache); + currentStream = execRoot.getRelative(input.getExecPathString()).getInputStream(); + bytesLeft = digest.getSizeBytes(); + } else { + digest = null; + currentStream = null; + bytesLeft = 0; + } + } while (digest != null && !digests.contains(digest)); + } + + @Override + public boolean hasNext() { + return currentStream != null; + } + + @Override + public BlobChunk next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException(); + } + BlobChunk.Builder chunk = BlobChunk.newBuilder(); + long offset = digest.getSizeBytes() - bytesLeft; + if (offset == 0) { + chunk.setDigest(digest); + } else { + chunk.setOffset(offset); + } + if (bytesLeft > 0) { + byte[] blob = new byte[(int) Math.min(bytesLeft, (long) options.grpcMaxChunkSizeBytes)]; + currentStream.read(blob); + chunk.setData(ByteString.copyFrom(blob)); + bytesLeft -= blob.length; + } + if (bytesLeft == 0) { + currentStream.close(); + advanceInput(); + } + return chunk.build(); + } + } + @VisibleForTesting public GrpcActionCache(ManagedChannel channel, RemoteOptions options) { this.options = options; @@ -278,11 +356,10 @@ public final class GrpcActionCache implements RemoteActionCache { } } if (!actionInputs.isEmpty()) { - ArrayList<Path> paths = new ArrayList<>(); - for (ActionInput actionInput : actionInputs) { - paths.add(execRoot.getRelative(actionInput.getExecPathString())); - } - uploadChunks(paths.size(), new BlobChunkFileIterator(missingDigests, paths.iterator())); + uploadChunks( + actionInputs.size(), + new BlobChunkActionInputIterator( + missingDigests, execRoot, actionInputs.iterator(), repository.getInputFileCache())); } } @@ -412,8 +489,7 @@ public final class GrpcActionCache implements RemoteActionCache { /** * Put the file contents cache if it is not already in it. No-op if the file is already stored in - * cache. The given path must be a full absolute path. Note: this is horribly inefficient, need to - * patch through an overload that uses an ActionInputFile cache to compute the digests! + * cache. The given path must be a full absolute path. * * @return The key for fetching the file contents blob from cache. */ @@ -428,6 +504,24 @@ public final class GrpcActionCache implements RemoteActionCache { } /** + * Put the file contents cache if it is not already in it. No-op if the file is already stored in + * cache. The given path must be a full absolute path. + * + * @return The key for fetching the file contents blob from cache. + */ + @Override + public ContentDigest uploadFileContents( + ActionInput input, Path execRoot, ActionInputFileCache inputCache) + throws IOException, InterruptedException { + ContentDigest digest = ContentDigests.getDigestFromInputCache(input, inputCache); + ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest)); + if (!missing.isEmpty()) { + uploadChunks(1, new BlobChunkActionInputIterator(input, execRoot, inputCache)); + } + return digest; + } + + /** * Download a blob keyed by the given digest and write it to the specified path. Set the * executable parameter to the specified value. */ |