diff options
Diffstat (limited to 'src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java')
-rw-r--r-- | src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java | 205 |
1 files changed, 123 insertions, 82 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java index 7e27653e76..72c75700b5 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ConcurrentMapActionCache.java @@ -14,18 +14,24 @@ package com.google.devtools.build.lib.remote; -import com.google.common.hash.HashCode; -import com.google.devtools.build.lib.actions.ActionInput; -import com.google.devtools.build.lib.actions.ActionInputFileCache; +import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import com.google.devtools.build.lib.remote.RemoteProtocol.CacheEntry; -import com.google.devtools.build.lib.remote.RemoteProtocol.FileEntry; +import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; +import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; +import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; +import com.google.devtools.build.lib.remote.RemoteProtocol.FileMetadata; +import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode; +import com.google.devtools.build.lib.remote.RemoteProtocol.Output; +import com.google.devtools.build.lib.remote.RemoteProtocol.Output.ContentCase; +import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.util.Preconditions; +import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; +import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Semaphore; @@ -38,121 +44,156 @@ import java.util.concurrent.Semaphore; */ @ThreadSafe public final class ConcurrentMapActionCache implements RemoteActionCache { - private final Path execRoot; private final ConcurrentMap<String, byte[]> cache; private static final int MAX_MEMORY_KBYTES = 512 * 1024; private final Semaphore uploadMemoryAvailable = new Semaphore(MAX_MEMORY_KBYTES, true); - public ConcurrentMapActionCache(Path execRoot, ConcurrentMap<String, byte[]> cache) { - this.execRoot = execRoot; + public ConcurrentMapActionCache(ConcurrentMap<String, byte[]> cache) { this.cache = cache; } @Override - public String putFileIfNotExist(ActionInputFileCache cache, ActionInput file) + public void uploadTree(TreeNodeRepository repository, Path execRoot, TreeNode root) throws IOException, InterruptedException { - String contentKey = HashCode.fromBytes(cache.getDigest(file)).toString(); - if (containsFile(contentKey)) { - return contentKey; + repository.computeMerkleDigests(root); + for (FileNode fileNode : repository.treeToFileNodes(root)) { + uploadBlob(fileNode.toByteArray()); + } + for (TreeNode leaf : repository.leaves(root)) { + uploadFileContents(execRoot.getRelative(leaf.getActionInput().getExecPathString())); } - putFile(contentKey, execRoot.getRelative(file.getExecPathString())); - return contentKey; } - private void putFile(String key, Path file) throws IOException, InterruptedException { - int fileSizeKBytes = (int) (file.getFileSize() / 1024); - Preconditions.checkArgument(fileSizeKBytes < MAX_MEMORY_KBYTES); - try { - uploadMemoryAvailable.acquire(fileSizeKBytes); - // TODO(alpha): I should put the file content as chunks to avoid reading the entire - // file into memory. - try (InputStream stream = file.getInputStream()) { - cache.put( - key, - CacheEntry.newBuilder() - .setFileContent(ByteString.readFrom(stream)) - .build() - .toByteArray()); - } - } finally { - uploadMemoryAvailable.release(fileSizeKBytes); + @Override + public void downloadTree(ContentDigest rootDigest, Path rootLocation) + throws IOException, CacheNotFoundException { + FileNode fileNode = FileNode.parseFrom(downloadBlob(rootDigest)); + if (fileNode.hasFileMetadata()) { + FileMetadata meta = fileNode.getFileMetadata(); + downloadFileContents(meta.getDigest(), rootLocation, meta.getExecutable()); + } + for (FileNode.Child child : fileNode.getChildList()) { + downloadTree(child.getDigest(), rootLocation.getRelative(child.getPath())); } } @Override - public void writeFile(String key, Path dest, boolean executable) + public ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException { + // This unconditionally reads the whole file into memory first! + return uploadBlob(ByteString.readFrom(file.getInputStream()).toByteArray()); + } + + @Override + public void downloadAllResults(ActionResult result, Path execRoot) throws IOException, CacheNotFoundException { - byte[] data = cache.get(key); - if (data == null) { - throw new CacheNotFoundException("File content cannot be found with key: " + key); - } - try (OutputStream stream = dest.getOutputStream()) { - CacheEntry.parseFrom(data).getFileContent().writeTo(stream); - dest.setExecutable(executable); + for (Output output : result.getOutputList()) { + if (output.getContentCase() == ContentCase.FILE_METADATA) { + FileMetadata m = output.getFileMetadata(); + downloadFileContents( + m.getDigest(), execRoot.getRelative(output.getPath()), m.getExecutable()); + } else { + downloadTree(output.getDigest(), execRoot.getRelative(output.getPath())); + } } } - private boolean containsFile(String key) { - return cache.containsKey(key); + @Override + public void uploadAllResults(Path execRoot, Collection<Path> files, ActionResult.Builder result) + throws IOException, InterruptedException { + for (Path file : files) { + if (file.isDirectory()) { + // TODO(olaola): to implement this for a directory, will need to create or pass a + // TreeNodeRepository to call uploadTree. + throw new UnsupportedOperationException("Storing a directory is not yet supported."); + } + // First put the file content to cache. + ContentDigest digest = uploadFileContents(file); + // Add to protobuf. + result + .addOutputBuilder() + .setPath(file.relativeTo(execRoot).getPathString()) + .getFileMetadataBuilder() + .setDigest(digest) + .setExecutable(file.isExecutable()); + } } @Override - public void writeActionOutput(String key, Path execRoot) + public void downloadFileContents(ContentDigest digest, Path dest, boolean executable) throws IOException, CacheNotFoundException { - byte[] data = cache.get(key); - if (data == null) { - throw new CacheNotFoundException("Action output cannot be found with key: " + key); + // This unconditionally downloads the whole file into memory first! + byte[] contents = downloadBlob(digest); + FileSystemUtils.createDirectoryAndParents(dest.getParentDirectory()); + try (OutputStream stream = dest.getOutputStream()) { + stream.write(contents); } - CacheEntry cacheEntry = CacheEntry.parseFrom(data); - for (FileEntry file : cacheEntry.getFilesList()) { - writeFile(file.getContentKey(), execRoot.getRelative(file.getPath()), file.getExecutable()); + dest.setExecutable(executable); + } + + @Override + public ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs) + throws InterruptedException { + ArrayList<ContentDigest> digests = new ArrayList<>(); + for (byte[] blob : blobs) { + digests.add(uploadBlob(blob)); } + return ImmutableList.copyOf(digests); } @Override - public void putActionOutput(String key, Collection<? extends ActionInput> outputs) - throws IOException, InterruptedException { - CacheEntry.Builder actionOutput = CacheEntry.newBuilder(); - for (ActionInput output : outputs) { - Path file = execRoot.getRelative(output.getExecPathString()); - addToActionOutput(file, output.getExecPathString(), actionOutput); + public ContentDigest uploadBlob(byte[] blob) throws InterruptedException { + int blobSizeKBytes = blob.length / 1024; + Preconditions.checkArgument(blobSizeKBytes < MAX_MEMORY_KBYTES); + ContentDigest digest = ContentDigests.computeDigest(blob); + uploadMemoryAvailable.acquire(blobSizeKBytes); + try { + cache.put(ContentDigests.toHexString(digest), blob); + } finally { + uploadMemoryAvailable.release(blobSizeKBytes); } - cache.put(key, actionOutput.build().toByteArray()); + return digest; } @Override - public void putActionOutput(String key, Path execRoot, Collection<Path> files) - throws IOException, InterruptedException { - CacheEntry.Builder actionOutput = CacheEntry.newBuilder(); - for (Path file : files) { - addToActionOutput(file, file.relativeTo(execRoot).getPathString(), actionOutput); + public byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException { + if (digest.getSizeBytes() == 0) { + return new byte[0]; + } + // This unconditionally downloads the whole blob into memory! + Preconditions.checkArgument((int) (digest.getSizeBytes() / 1024) < MAX_MEMORY_KBYTES); + byte[] data = cache.get(ContentDigests.toHexString(digest)); + if (data == null) { + throw new CacheNotFoundException(digest); } - cache.put(key, actionOutput.build().toByteArray()); + return data; } - /** Add the file to action output cache entry. Put the file to cache if necessary. */ - private void addToActionOutput(Path file, String execPathString, CacheEntry.Builder actionOutput) - throws IOException, InterruptedException { - if (file.isDirectory()) { - // TODO(alpha): Implement this for directory. - throw new UnsupportedOperationException("Storing a directory is not yet supported."); + @Override + public ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests) + throws CacheNotFoundException { + ArrayList<byte[]> blobs = new ArrayList<>(); + for (ContentDigest c : digests) { + blobs.add(downloadBlob(c)); } - // First put the file content to cache. - String contentKey = putFileIfNotExist(file); - // Add to protobuf. - actionOutput - .addFilesBuilder() - .setPath(execPathString) - .setContentKey(contentKey) - .setExecutable(file.isExecutable()); + return ImmutableList.copyOf(blobs); } - private String putFileIfNotExist(Path file) throws IOException, InterruptedException { - String contentKey = HashCode.fromBytes(file.getMD5Digest()).toString(); - if (containsFile(contentKey)) { - return contentKey; + @Override + public ActionResult getCachedActionResult(ActionKey actionKey) { + byte[] data = cache.get(ContentDigests.toHexString(actionKey.getDigest())); + if (data == null) { + return null; } - putFile(contentKey, file); - return contentKey; + try { + return ActionResult.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + return null; + } + } + + @Override + public void setCachedActionResult(ActionKey actionKey, ActionResult result) + throws InterruptedException { + cache.put(ContentDigests.toHexString(actionKey.getDigest()), result.toByteArray()); } } |