diff options
9 files changed, 217 insertions, 309 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java index 62605e1c2d..449b24c833 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java @@ -108,9 +108,7 @@ final class CachedLocalSpawnRunner implements SpawnRunner { // For now, download all outputs locally; in the future, we can reuse the digests to // just update the TreeNodeRepository and continue the build. try { - // TODO(ulfjack): Download stdout, stderr, and the output files in a single call. - remoteCache.downloadAllResults(result, execRoot); - passRemoteOutErr(result, policy.getFileOutErr()); + remoteCache.download(result, execRoot, policy.getFileOutErr()); return new SpawnResult.Builder() .setStatus(Status.SUCCESS) .setExitCode(result.getExitCode()) @@ -164,30 +162,6 @@ final class CachedLocalSpawnRunner implements SpawnRunner { return command.build(); } - private void passRemoteOutErr(ActionResult result, FileOutErr outErr) throws IOException { - try { - if (!result.getStdoutRaw().isEmpty()) { - result.getStdoutRaw().writeTo(outErr.getOutputStream()); - outErr.getOutputStream().flush(); - } else if (result.hasStdoutDigest()) { - byte[] stdoutBytes = remoteCache.downloadBlob(result.getStdoutDigest()); - outErr.getOutputStream().write(stdoutBytes); - outErr.getOutputStream().flush(); - } - if (!result.getStderrRaw().isEmpty()) { - result.getStderrRaw().writeTo(outErr.getErrorStream()); - outErr.getErrorStream().flush(); - } else if (result.hasStderrDigest()) { - byte[] stderrBytes = remoteCache.downloadBlob(result.getStderrDigest()); - outErr.getErrorStream().write(stderrBytes); - outErr.getErrorStream().flush(); - } - } catch (CacheNotFoundException e) { - outErr.printOutLn("Failed to fetch remote stdout/err due to cache miss."); - outErr.getOutputStream().flush(); - } - } - private void writeCacheEntry(Spawn spawn, FileOutErr outErr, ActionKey actionKey) throws IOException, InterruptedException { ArrayList<Path> outputFiles = new ArrayList<>(); @@ -199,12 +173,6 @@ final class CachedLocalSpawnRunner implements SpawnRunner { outputFiles.add(outputPath); } } - ActionResult.Builder result = ActionResult.newBuilder(); - remoteCache.uploadAllResults(execRoot, outputFiles, result); - Digest stderr = remoteCache.uploadFileContents(outErr.getErrorPath()); - Digest stdout = remoteCache.uploadFileContents(outErr.getOutputPath()); - result.setStderrDigest(stderr); - result.setStdoutDigest(stdout); - remoteCache.setCachedActionResult(actionKey, result.build()); + remoteCache.upload(actionKey, execRoot, outputFiles, outErr); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java index d93842244d..ba8a9d4257 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java @@ -32,6 +32,7 @@ import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.util.Preconditions; +import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc; @@ -39,6 +40,7 @@ import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheBlo import com.google.devtools.remoteexecution.v1test.ActionResult; import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsRequest; import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsResponse; +import com.google.devtools.remoteexecution.v1test.Command; import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc; import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageBlockingStub; import com.google.devtools.remoteexecution.v1test.Digest; @@ -145,8 +147,9 @@ public class GrpcActionCache implements RemoteActionCache { * reassembled remotely using the root digest. */ @Override - public void uploadTree(TreeNodeRepository repository, Path execRoot, TreeNode root) - throws IOException, InterruptedException { + public void ensureInputsPresent( + TreeNodeRepository repository, Path execRoot, TreeNode root, Command command) + throws IOException, InterruptedException { repository.computeMerkleDigests(root); // TODO(olaola): avoid querying all the digests, only ask for novel subtrees. ImmutableSet<Digest> missingDigests = getMissingDigests(repository.getAllDigests(root)); @@ -176,6 +179,7 @@ public class GrpcActionCache implements RemoteActionCache { } } } + uploadBlob(command.toByteArray()); if (!actionInputs.isEmpty()) { uploadChunks( actionInputs.size(), @@ -189,9 +193,8 @@ public class GrpcActionCache implements RemoteActionCache { /** * Download the entire tree data rooted by the given digest and write it into the given location. */ - @Override - public void downloadTree(Digest rootDigest, Path rootLocation) - throws IOException, CacheNotFoundException { + @SuppressWarnings("unused") + private void downloadTree(Digest rootDigest, Path rootLocation) { throw new UnsupportedOperationException(); } @@ -200,11 +203,8 @@ public class GrpcActionCache implements RemoteActionCache { * include the {@link com.google.devtools.build.lib.remote.TreeNodeRepository} for updating. */ @Override - public void downloadAllResults(ActionResult result, Path execRoot) + public void download(ActionResult result, Path execRoot, FileOutErr outErr) throws IOException, CacheNotFoundException { - if (result.getOutputFilesList().isEmpty() && result.getOutputDirectoriesList().isEmpty()) { - return; - } for (OutputFile file : result.getOutputFilesList()) { Path path = execRoot.getRelative(file.getPath()); FileSystemUtils.createDirectoryAndParents(path.getParentDirectory()); @@ -229,6 +229,28 @@ public class GrpcActionCache implements RemoteActionCache { for (OutputDirectory directory : result.getOutputDirectoriesList()) { downloadTree(directory.getDigest(), execRoot.getRelative(directory.getPath())); } + // TODO(ulfjack): use same code as above also for stdout / stderr if applicable. + downloadOutErr(result, outErr); + } + + private void downloadOutErr(ActionResult result, FileOutErr outErr) + throws IOException, CacheNotFoundException { + if (!result.getStdoutRaw().isEmpty()) { + result.getStdoutRaw().writeTo(outErr.getOutputStream()); + outErr.getOutputStream().flush(); + } else if (result.hasStdoutDigest()) { + byte[] stdoutBytes = downloadBlob(result.getStdoutDigest()); + outErr.getOutputStream().write(stdoutBytes); + outErr.getOutputStream().flush(); + } + if (!result.getStderrRaw().isEmpty()) { + result.getStderrRaw().writeTo(outErr.getErrorStream()); + outErr.getErrorStream().flush(); + } else if (result.hasStderrDigest()) { + byte[] stderrBytes = downloadBlob(result.getStderrDigest()); + outErr.getErrorStream().write(stderrBytes); + outErr.getErrorStream().flush(); + } } private Iterator<ReadResponse> readBlob(Digest digest) throws CacheNotFoundException { @@ -249,10 +271,30 @@ public class GrpcActionCache implements RemoteActionCache { } } - /** Upload all results of a locally executed action to the cache. */ @Override - public void uploadAllResults(Path execRoot, Collection<Path> files, ActionResult.Builder result) - throws IOException, InterruptedException { + public void upload( + ActionKey actionKey, Path execRoot, Collection<Path> files, FileOutErr outErr) + throws IOException, InterruptedException { + ActionResult.Builder result = ActionResult.newBuilder(); + upload(execRoot, files, outErr, result); + try { + acBlockingStub + .get() + .updateActionResult( + UpdateActionResultRequest.newBuilder() + .setInstanceName(options.remoteInstanceName) + .setActionDigest(actionKey.getDigest()) + .setActionResult(result) + .build()); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() != Status.Code.UNIMPLEMENTED) { + throw e; + } + } + } + + void upload(Path execRoot, Collection<Path> files, FileOutErr outErr, ActionResult.Builder result) + throws IOException, InterruptedException { ArrayList<Digest> digests = new ArrayList<>(); Chunker.Builder b = new Chunker.Builder(); for (Path file : files) { @@ -282,6 +324,15 @@ public class GrpcActionCache implements RemoteActionCache { .setDigest(digests.get(index++)) .setIsExecutable(file.isExecutable()); } + // TODO(ulfjack): Use the Chunker also for stdout / stderr. + if (outErr.getErrorPath().exists()) { + Digest stderr = uploadFileContents(outErr.getErrorPath()); + result.setStderrDigest(stderr); + } + if (outErr.getOutputPath().exists()) { + Digest stdout = uploadFileContents(outErr.getOutputPath()); + result.setStdoutDigest(stdout); + } } /** @@ -290,8 +341,7 @@ public class GrpcActionCache implements RemoteActionCache { * * @return The key for fetching the file contents blob from cache. */ - @Override - public Digest uploadFileContents(Path file) throws IOException, InterruptedException { + private Digest uploadFileContents(Path file) throws IOException, InterruptedException { Digest digest = Digests.computeDigest(file); ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest)); if (!missing.isEmpty()) { @@ -306,8 +356,7 @@ public class GrpcActionCache implements RemoteActionCache { * * @return The key for fetching the file contents blob from cache. */ - @Override - public Digest uploadFileContents( + Digest uploadFileContents( ActionInput input, Path execRoot, ActionInputFileCache inputCache) throws IOException, InterruptedException { Digest digest = Digests.getDigestFromInputCache(input, inputCache); @@ -392,8 +441,7 @@ public class GrpcActionCache implements RemoteActionCache { } } - @Override - public Digest uploadBlob(byte[] blob) throws InterruptedException { + Digest uploadBlob(byte[] blob) throws InterruptedException { Digest digest = Digests.computeDigest(blob); ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest)); try { @@ -407,8 +455,7 @@ public class GrpcActionCache implements RemoteActionCache { } } - @Override - public byte[] downloadBlob(Digest digest) throws CacheNotFoundException { + byte[] downloadBlob(Digest digest) throws CacheNotFoundException { if (digest.getSizeBytes() == 0) { return new byte[0]; } @@ -444,24 +491,4 @@ public class GrpcActionCache implements RemoteActionCache { throw e; } } - - /** Sets the given result as result of the given Action. */ - @Override - public void setCachedActionResult(ActionKey actionKey, ActionResult result) - throws InterruptedException { - try { - acBlockingStub - .get() - .updateActionResult( - UpdateActionResultRequest.newBuilder() - .setInstanceName(options.remoteInstanceName) - .setActionDigest(actionKey.getDigest()) - .setActionResult(result) - .build()); - } catch (StatusRuntimeException e) { - if (e.getStatus().getCode() != Status.Code.UNIMPLEMENTED) { - throw e; - } - } - } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java index 84fdb29430..44a30f5745 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java @@ -14,14 +14,13 @@ package com.google.devtools.build.lib.remote; -import com.google.devtools.build.lib.actions.ActionInput; -import com.google.devtools.build.lib.actions.ActionInputFileCache; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible; import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; +import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.remoteexecution.v1test.ActionResult; -import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.devtools.remoteexecution.v1test.Command; import java.io.IOException; import java.util.Collection; import javax.annotation.Nullable; @@ -35,61 +34,44 @@ interface RemoteActionCache { // ways to signal a cache miss. /** - * Upload enough of the tree metadata and data into remote cache so that the entire tree can be - * reassembled remotely using the root digest. + * Ensures that the tree structure of the inputs, the input files themselves, and the command are + * available in the remote cache, such that the tree can be reassembled and executed on another + * machine given the root digest. + * + * <p>The cache may check whether files or parts of the tree structure are already present, and do + * not need to be uploaded again. + * + * <p>Note that this method is only required for remote execution, not for caching itself. + * However, remote execution uses a cache to store input files, and that may be a separate + * end-point from the executor itself, so the functionality lives here. A pure remote caching + * implementation that does not support remote execution may choose not to implement this + * function, and throw {@link UnsupportedOperationException} instead. If so, it should be clearly + * documented that it cannot be used for remote execution. */ - void uploadTree(TreeNodeRepository repository, Path execRoot, TreeNode root) - throws IOException, InterruptedException; + void ensureInputsPresent( + TreeNodeRepository repository, Path execRoot, TreeNode root, Command command) + throws IOException, InterruptedException; /** - * Download the entire tree data rooted by the given digest and write it into the given location. + * Download the output files and directory trees of a remotely executed action to the local + * machine, as well stdin / stdout to the given files. */ - void downloadTree(Digest rootDigest, Path rootLocation) + // TODO(olaola): will need to amend to include the TreeNodeRepository for updating. + void download(ActionResult result, Path execRoot, FileOutErr outErr) throws IOException, CacheNotFoundException; /** - * Download all results of a remotely executed action locally. TODO(olaola): will need to amend to - * include the {@link com.google.devtools.build.lib.remote.TreeNodeRepository} for updating. - */ - void downloadAllResults(ActionResult result, Path execRoot) - throws IOException, CacheNotFoundException; - - /** - * Upload all results of a locally executed action to the cache. Add the files to the ActionResult - * builder. - */ - void uploadAllResults(Path execRoot, Collection<Path> files, ActionResult.Builder result) - throws IOException, InterruptedException; - - /** - * Put the file contents in cache if it is not already in it. No-op if the file is already stored - * in cache. The given path must be a full absolute path. - * - * @return The key for fetching the file contents blob from cache. + * Attempts to look up the given action in the remote cache and return its result, if present. + * Returns {@code null} if there is no such entry. Note that a successful result from this method + * does not guarantee the availability of the corresponding output files in the remote cache. */ - Digest uploadFileContents(Path file) throws IOException, InterruptedException; + @Nullable + ActionResult getCachedActionResult(ActionKey actionKey); /** - * Put the input file contents in cache if it is not already in it. No-op if the data is already - * stored in cache. - * - * @return The key for fetching the file contents blob from cache. + * Upload the result of a locally executed action to the cache by uploading any necessary files, + * stdin / stdout, as well as adding an entry for the given action key to the cache. */ - Digest uploadFileContents(ActionInput input, Path execRoot, ActionInputFileCache inputCache) + void upload(ActionKey actionKey, Path execRoot, Collection<Path> files, FileOutErr outErr) throws IOException, InterruptedException; - - /** Upload the given blob to the cache, and return its digests. */ - Digest uploadBlob(byte[] blob) throws InterruptedException; - - /** Download and return a blob with a given digest from the cache. */ - byte[] downloadBlob(Digest digest) throws CacheNotFoundException; - - // Execution Cache API - - /** Returns a cached result for a given Action digest, or null if not found in cache. */ - @Nullable - ActionResult getCachedActionResult(ActionKey actionKey); - - /** Sets the given result as result of the given Action. */ - void setCachedActionResult(ActionKey actionKey, ActionResult result) throws InterruptedException; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java index 412031bb1b..d70dc15937 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java @@ -26,7 +26,6 @@ import com.google.devtools.build.lib.exec.SpawnResult.Status; import com.google.devtools.build.lib.exec.SpawnRunner; import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; -import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.remoteexecution.v1test.Action; @@ -108,8 +107,7 @@ final class RemoteSpawnRunner implements SpawnRunner { if (result == null) { // Cache miss or we don't accept cache hits. // Upload the command and all the inputs into the remote cache. - remoteCache.uploadBlob(command.toByteArray()); - remoteCache.uploadTree(repository, execRoot, inputRoot); + remoteCache.ensureInputsPresent(repository, execRoot, inputRoot, command); // TODO(olaola): set BuildInfo and input total bytes as well. ExecuteRequest.Builder request = ExecuteRequest.newBuilder() @@ -120,9 +118,7 @@ final class RemoteSpawnRunner implements SpawnRunner { result = executor.executeRemotely(request.build()).getResult(); } - // TODO(ulfjack): Download stdout, stderr, and the output files in a single call. - passRemoteOutErr(remoteCache, result, policy.getFileOutErr()); - remoteCache.downloadAllResults(result, execRoot); + remoteCache.download(result, execRoot, policy.getFileOutErr()); return new SpawnResult.Builder() .setStatus(Status.SUCCESS) // Even if the action failed with non-zero exit code. .setExitCode(result.getExitCode()) @@ -162,29 +158,4 @@ final class RemoteSpawnRunner implements SpawnRunner { } return command.build(); } - - private static void passRemoteOutErr( - RemoteActionCache cache, ActionResult result, FileOutErr outErr) throws IOException { - try { - if (!result.getStdoutRaw().isEmpty()) { - result.getStdoutRaw().writeTo(outErr.getOutputStream()); - outErr.getOutputStream().flush(); - } else if (result.hasStdoutDigest()) { - byte[] stdoutBytes = cache.downloadBlob(result.getStdoutDigest()); - outErr.getOutputStream().write(stdoutBytes); - outErr.getOutputStream().flush(); - } - if (!result.getStderrRaw().isEmpty()) { - result.getStderrRaw().writeTo(outErr.getErrorStream()); - outErr.getErrorStream().flush(); - } else if (result.hasStderrDigest()) { - byte[] stderrBytes = cache.downloadBlob(result.getStderrDigest()); - outErr.getErrorStream().write(stderrBytes); - outErr.getErrorStream().flush(); - } - } catch (CacheNotFoundException e) { - outErr.printOutLn("Failed to fetch remote stdout/err due to cache miss."); - outErr.getOutputStream().flush(); - } - } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java index f33a28cf5a..43bb3770ab 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java @@ -34,7 +34,6 @@ import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.rules.fileset.FilesetActionContext; import com.google.devtools.build.lib.util.CommandFailureUtils; -import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.remoteexecution.v1test.Action; @@ -181,19 +180,8 @@ final class RemoteSpawnStrategy implements SpawnActionContext { outputFiles.add(outputFile); } try { - ActionResult.Builder result = ActionResult.newBuilder(); - remoteCache.uploadAllResults(execRoot, outputFiles, result); - FileOutErr outErr = actionExecutionContext.getFileOutErr(); - if (outErr.getErrorPath().exists()) { - Digest stderr = remoteCache.uploadFileContents(outErr.getErrorPath()); - result.setStderrDigest(stderr); - } - if (outErr.getOutputPath().exists()) { - Digest stdout = remoteCache.uploadFileContents(outErr.getOutputPath()); - result.setStdoutDigest(stdout); - } - remoteCache.setCachedActionResult(actionKey, result.build()); - // Handle all cache errors here. + remoteCache.upload( + actionKey, execRoot, outputFiles, actionExecutionContext.getFileOutErr()); } catch (IOException e) { throw new UserExecException("Unexpected IO error.", e); } catch (UnsupportedOperationException e) { @@ -210,31 +198,6 @@ final class RemoteSpawnStrategy implements SpawnActionContext { } } - private static void passRemoteOutErr( - RemoteActionCache cache, ActionResult result, FileOutErr outErr) throws IOException { - try { - if (!result.getStdoutRaw().isEmpty()) { - result.getStdoutRaw().writeTo(outErr.getOutputStream()); - outErr.getOutputStream().flush(); - } else if (result.hasStdoutDigest()) { - byte[] stdoutBytes = cache.downloadBlob(result.getStdoutDigest()); - outErr.getOutputStream().write(stdoutBytes); - outErr.getOutputStream().flush(); - } - if (!result.getStderrRaw().isEmpty()) { - result.getStderrRaw().writeTo(outErr.getErrorStream()); - outErr.getErrorStream().flush(); - } else if (result.hasStderrDigest()) { - byte[] stderrBytes = cache.downloadBlob(result.getStderrDigest()); - outErr.getErrorStream().write(stderrBytes); - outErr.getErrorStream().flush(); - } - } catch (CacheNotFoundException e) { - outErr.printOutLn("Failed to fetch remote stdout/err due to cache miss."); - outErr.getOutputStream().flush(); - } - } - @Override public String toString() { return "remote"; @@ -292,8 +255,7 @@ final class RemoteSpawnStrategy implements SpawnActionContext { // For now, download all outputs locally; in the future, we can reuse the digests to // just update the TreeNodeRepository and continue the build. try { - remoteCache.downloadAllResults(result, execRoot); - passRemoteOutErr(remoteCache, result, actionExecutionContext.getFileOutErr()); + remoteCache.download(result, execRoot, actionExecutionContext.getFileOutErr()); return; } catch (CacheNotFoundException e) { acceptCachedResult = false; // Retry the action remotely and invalidate the results. @@ -306,8 +268,7 @@ final class RemoteSpawnStrategy implements SpawnActionContext { } // Upload the command and all the inputs into the remote cache. - remoteCache.uploadBlob(command.toByteArray()); - remoteCache.uploadTree(repository, execRoot, inputRoot); + remoteCache.ensureInputsPresent(repository, execRoot, inputRoot, command); // TODO(olaola): set BuildInfo and input total bytes as well. ExecuteRequest.Builder request = ExecuteRequest.newBuilder() @@ -321,8 +282,7 @@ final class RemoteSpawnStrategy implements SpawnActionContext { execLocally(spawn, actionExecutionContext, remoteCache, actionKey); return; } - passRemoteOutErr(remoteCache, result, actionExecutionContext.getFileOutErr()); - remoteCache.downloadAllResults(result, execRoot); + remoteCache.download(result, execRoot, actionExecutionContext.getFileOutErr()); if (result.getExitCode() != 0) { String cwd = actionExecutionContext.getExecRoot().getPathString(); String message = diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java index 75d208a687..55e1860561 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java @@ -21,9 +21,11 @@ import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.util.Preconditions; +import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.remoteexecution.v1test.ActionResult; +import com.google.devtools.remoteexecution.v1test.Command; import com.google.devtools.remoteexecution.v1test.Digest; import com.google.devtools.remoteexecution.v1test.Directory; import com.google.devtools.remoteexecution.v1test.DirectoryNode; @@ -43,11 +45,15 @@ import java.util.concurrent.Semaphore; * and action output. * * <p>The thread safety is guaranteed by the underlying map. + * + * <p>Note that this class is used from src/tools/remote_worker. */ @ThreadSafe public final class SimpleBlobStoreActionCache implements RemoteActionCache { - private final SimpleBlobStore blobStore; private static final int MAX_MEMORY_KBYTES = 512 * 1024; + private static final int MAX_BLOB_SIZE_FOR_INLINE = 10 * 1024; + + private final SimpleBlobStore blobStore; private final Semaphore uploadMemoryAvailable = new Semaphore(MAX_MEMORY_KBYTES, true); public SimpleBlobStoreActionCache(SimpleBlobStore blobStore) { @@ -55,9 +61,11 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { } @Override - public void uploadTree(TreeNodeRepository repository, Path execRoot, TreeNode root) - throws IOException, InterruptedException { + public void ensureInputsPresent( + TreeNodeRepository repository, Path execRoot, TreeNode root, Command command) + throws IOException, InterruptedException { repository.computeMerkleDigests(root); + uploadBlob(command.toByteArray()); for (Directory directory : repository.treeToDirectories(root)) { uploadBlob(directory.toByteArray()); } @@ -67,7 +75,6 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { } } - @Override public void downloadTree(Digest rootDigest, Path rootLocation) throws IOException, CacheNotFoundException { Directory directory = Directory.parseFrom(downloadBlob(rootDigest)); @@ -80,16 +87,14 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { } } - @Override - public Digest uploadFileContents(Path file) throws IOException, InterruptedException { + private Digest uploadFileContents(Path file) throws IOException, InterruptedException { // This unconditionally reads the whole file into memory first! return uploadBlob(ByteString.readFrom(file.getInputStream()).toByteArray()); } - @Override - public Digest uploadFileContents( + private Digest uploadFileContents( ActionInput input, Path execRoot, ActionInputFileCache inputCache) - throws IOException, InterruptedException { + throws IOException, InterruptedException { // This unconditionally reads the whole file into memory first! if (input instanceof VirtualActionInput) { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); @@ -104,7 +109,7 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { } @Override - public void downloadAllResults(ActionResult result, Path execRoot) + public void download(ActionResult result, Path execRoot, FileOutErr outErr) throws IOException, CacheNotFoundException { for (OutputFile file : result.getOutputFilesList()) { if (!file.getContent().isEmpty()) { @@ -120,12 +125,47 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { for (OutputDirectory directory : result.getOutputDirectoriesList()) { downloadTree(directory.getDigest(), execRoot.getRelative(directory.getPath())); } + downloadOutErr(result, outErr); + } + + private void downloadOutErr(ActionResult result, FileOutErr outErr) + throws IOException, CacheNotFoundException { + if (!result.getStdoutRaw().isEmpty()) { + result.getStdoutRaw().writeTo(outErr.getOutputStream()); + outErr.getOutputStream().flush(); + } else if (result.hasStdoutDigest()) { + downloadFileContents(result.getStdoutDigest(), outErr.getOutputPath(), /*executable=*/false); + } + if (!result.getStderrRaw().isEmpty()) { + result.getStderrRaw().writeTo(outErr.getErrorStream()); + outErr.getErrorStream().flush(); + } else if (result.hasStderrDigest()) { + downloadFileContents(result.getStderrDigest(), outErr.getErrorPath(), /*executable=*/false); + } } @Override - public void uploadAllResults(Path execRoot, Collection<Path> files, ActionResult.Builder result) + public void upload( + ActionKey actionKey, Path execRoot, Collection<Path> files, FileOutErr outErr) + throws IOException, InterruptedException { + ActionResult.Builder result = ActionResult.newBuilder(); + upload(result, execRoot, files); + if (outErr.getErrorPath().exists()) { + Digest stderr = uploadFileContents(outErr.getErrorPath()); + result.setStderrDigest(stderr); + } + if (outErr.getOutputPath().exists()) { + Digest stdout = uploadFileContents(outErr.getOutputPath()); + result.setStdoutDigest(stdout); + } + blobStore.put(actionKey.getDigest().getHash(), result.build().toByteArray()); + } + + public void upload(ActionResult.Builder result, Path execRoot, Collection<Path> files) throws IOException, InterruptedException { for (Path file : files) { + // TODO(ulfjack): Maybe pass in a SpawnResult here, add a list of output files to that, and + // rely on the local spawn runner to stat the files, instead of statting here. if (!file.exists()) { continue; } @@ -146,6 +186,20 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { } } + public void uploadOutErr(ActionResult.Builder result, byte[] stdout, byte[] stderr) + throws InterruptedException { + if (stdout.length <= MAX_BLOB_SIZE_FOR_INLINE) { + result.setStdoutRaw(ByteString.copyFrom(stdout)); + } else if (stdout.length > 0) { + result.setStdoutDigest(uploadBlob(stdout)); + } + if (stderr.length <= MAX_BLOB_SIZE_FOR_INLINE) { + result.setStderrRaw(ByteString.copyFrom(stderr)); + } else if (stderr.length > 0) { + result.setStderrDigest(uploadBlob(stderr)); + } + } + private void downloadFileContents(Digest digest, Path dest, boolean executable) throws IOException, CacheNotFoundException { // This unconditionally downloads the whole file into memory first! @@ -168,7 +222,6 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { MAX_MEMORY_KBYTES); } - @Override public Digest uploadBlob(byte[] blob) throws InterruptedException { return uploadBlob(blob, Digests.computeDigest(blob)); } @@ -185,7 +238,6 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { return digest; } - @Override public byte[] downloadBlob(Digest digest) throws CacheNotFoundException { if (digest.getSizeBytes() == 0) { return new byte[0]; @@ -216,7 +268,6 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { } } - @Override public void setCachedActionResult(ActionKey actionKey, ActionResult result) throws InterruptedException { blobStore.put(actionKey.getDigest().getHash(), result.toByteArray()); diff --git a/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java index ce445f7d4e..15eab5a4a1 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java @@ -14,9 +14,9 @@ package com.google.devtools.build.lib.remote; import static com.google.common.truth.Truth.assertThat; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -38,6 +38,7 @@ import com.google.devtools.build.lib.exec.SpawnRunner.ProgressStatus; import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionPolicy; import com.google.devtools.build.lib.exec.util.FakeOwner; import com.google.devtools.build.lib.remote.Digests.ActionKey; +import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.FileSystem; import com.google.devtools.build.lib.vfs.FileSystemUtils; @@ -46,8 +47,7 @@ import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; import com.google.devtools.common.options.Options; import com.google.devtools.remoteexecution.v1test.ActionResult; -import com.google.devtools.remoteexecution.v1test.Digest; -import com.google.protobuf.ByteString; +import com.google.devtools.remoteexecution.v1test.Command; import java.io.IOException; import java.util.Collection; import java.util.SortedMap; @@ -55,9 +55,8 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mockito; +import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.mockito.MockitoAnnotations.Mock; /** Tests for {@link CachedLocalSpawnRunner}. */ @RunWith(JUnit4.class) @@ -76,7 +75,7 @@ public class CachedLocalSpawnRunnerTest { private FakeActionInputFileCache fakeFileCache; @Mock private RemoteActionCache cache; @Mock private SpawnRunner delegate; - CachedLocalSpawnRunner runner; + private CachedLocalSpawnRunner runner; private FileOutErr outErr; private final SpawnExecutionPolicy simplePolicy = @@ -147,85 +146,38 @@ public class CachedLocalSpawnRunnerTest { when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(actionResult); SpawnResult result = runner.exec(simpleSpawn, simplePolicy); - // In line with state-based testing, we only verify calls that produce side effects. - verify(cache).downloadAllResults(actionResult, execRoot); + // All other methods on RemoteActionCache have side effects, so we verify all of them. + verify(cache).download(actionResult, execRoot, outErr); + verify(cache, never()) + .ensureInputsPresent( + any(TreeNodeRepository.class), + any(Path.class), + any(TreeNode.class), + any(Command.class)); + verify(cache, never()) + .upload( + any(ActionKey.class), any(Path.class), any(Collection.class), any(FileOutErr.class)); assertThat(result.setupSuccess()).isTrue(); assertThat(result.exitCode()).isEqualTo(0); + // We expect the CachedLocalSpawnRunner to _not_ write to outErr at all. assertThat(outErr.hasRecordedOutput()).isFalse(); assertThat(outErr.hasRecordedStderr()).isFalse(); } @SuppressWarnings("unchecked") @Test - public void cacheHitWithOutput() throws Exception { - byte[] cacheStdOut = "stdout".getBytes(UTF_8); - byte[] cacheStdErr = "stderr".getBytes(UTF_8); - Digest stdOutDigest = Digests.computeDigest(cacheStdOut); - Digest stdErrDigest = Digests.computeDigest(cacheStdErr); - - ActionResult actionResult = - ActionResult.newBuilder() - .setStdoutDigest(stdOutDigest) - .setStderrDigest(stdErrDigest) - .build(); - when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(actionResult); - when(cache.downloadBlob(stdOutDigest)).thenReturn(cacheStdOut); - when(cache.downloadBlob(stdErrDigest)).thenReturn(cacheStdErr); - - SpawnResult result = runner.exec(simpleSpawn, simplePolicy); - // In line with state-based testing, we only verify calls that produce side effects. - verify(cache).downloadAllResults(actionResult, execRoot); - assertThat(result.setupSuccess()).isTrue(); - assertThat(result.exitCode()).isEqualTo(0); - assertThat(outErr.outAsLatin1()).isEqualTo("stdout"); - assertThat(outErr.errAsLatin1()).isEqualTo("stderr"); - } - - @SuppressWarnings("unchecked") - @Test - public void cacheHitWithOutputsInlined() throws Exception { - ActionResult actionResult = - ActionResult.newBuilder() - .setStdoutRaw(ByteString.copyFromUtf8("stdout")) - .setStderrRaw(ByteString.copyFromUtf8("stderr")) - .build(); - when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(actionResult); - - SpawnResult result = runner.exec(simpleSpawn, simplePolicy); - // In line with state-based testing, we only verify calls that produce side effects. - verify(cache).downloadAllResults(actionResult, execRoot); - assertThat(result.setupSuccess()).isTrue(); - assertThat(result.exitCode()).isEqualTo(0); - assertThat(outErr.outAsLatin1()).isEqualTo("stdout"); - assertThat(outErr.errAsLatin1()).isEqualTo("stderr"); - } - - @SuppressWarnings("unchecked") - @Test public void cacheMiss() throws Exception { - byte[] cacheStdOut = "stdout".getBytes(UTF_8); - byte[] cacheStdErr = "stderr".getBytes(UTF_8); - Digest stdOutDigest = Digests.computeDigest(cacheStdOut); - Digest stdErrDigest = Digests.computeDigest(cacheStdErr); - when(cache.uploadFileContents(any(Path.class))).thenReturn(stdErrDigest, stdOutDigest); - SpawnResult delegateResult = - new SpawnResult.Builder().setExitCode(0).setStatus(Status.SUCCESS).build(); when(delegate.exec(any(Spawn.class), any(SpawnExecutionPolicy.class))) - .thenReturn(delegateResult); + .thenReturn(new SpawnResult.Builder().setExitCode(0).setStatus(Status.SUCCESS).build()); SpawnResult result = runner.exec(simpleSpawn, simplePolicy); assertThat(result.setupSuccess()).isTrue(); assertThat(result.exitCode()).isEqualTo(0); - // We use verify to check that each method is called the correct number of times. verify(cache) - .uploadAllResults(any(Path.class), any(Collection.class), any(ActionResult.Builder.class)); - // Two additional uploads for stdout and stderr. - verify(cache, Mockito.times(2)).uploadFileContents(any(Path.class)); - ActionResult actionResult = - ActionResult.newBuilder() - .setStdoutDigest(stdOutDigest) - .setStderrDigest(stdErrDigest) - .build(); - verify(cache).setCachedActionResult(any(ActionKey.class), eq(actionResult)); + .upload( + any(ActionKey.class), + any(Path.class), + any(Collection.class), + eq(outErr)); } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java index b3595dca3e..b896ab5d01 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java @@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.actions.ActionInputHelper; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; import com.google.devtools.build.lib.testutil.Scratch; +import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.FileSystem; import com.google.devtools.build.lib.vfs.FileSystem.HashFunction; import com.google.devtools.build.lib.vfs.FileSystemUtils; @@ -67,6 +68,7 @@ import org.mockito.stubbing.Answer; public class GrpcActionCacheTest { private FileSystem fs; private Path execRoot; + private FileOutErr outErr; private FakeActionInputFileCache fakeFileCache; private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); private final String fakeServerName = "fake server for " + getClass(); @@ -87,6 +89,12 @@ public class GrpcActionCacheTest { execRoot = fs.getPath("/exec/root"); FileSystemUtils.createDirectoryAndParents(execRoot); fakeFileCache = new FakeActionInputFileCache(execRoot); + + Path stdout = fs.getPath("/tmp/stdout"); + Path stderr = fs.getPath("/tmp/stderr"); + FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory()); + FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory()); + outErr = new FileOutErr(stdout, stderr); } @After @@ -194,7 +202,7 @@ public class GrpcActionCacheTest { result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest); result.addOutputFilesBuilder().setPath("b/empty").setDigest(emptyDigest); result.addOutputFilesBuilder().setPath("a/bar").setDigest(barDigest).setIsExecutable(true); - client.downloadAllResults(result.build(), execRoot); + client.download(result.build(), execRoot, null); assertThat(Digests.computeDigest(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest); assertThat(Digests.computeDigest(execRoot.getRelative("b/empty"))).isEqualTo(emptyDigest); assertThat(Digests.computeDigest(execRoot.getRelative("a/bar"))).isEqualTo(barDigest); @@ -380,7 +388,7 @@ public class GrpcActionCacheTest { }); ActionResult.Builder result = ActionResult.newBuilder(); - client.uploadAllResults(execRoot, ImmutableList.<Path>of(fooFile, barFile), result); + client.upload(execRoot, ImmutableList.<Path>of(fooFile, barFile), outErr, result); ActionResult.Builder expectedResult = ActionResult.newBuilder(); expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest); expectedResult @@ -429,7 +437,7 @@ public class GrpcActionCacheTest { .thenAnswer(blobChunkedWriteAnswer("x", 1)); ActionResult.Builder result = ActionResult.newBuilder(); - client.uploadAllResults(execRoot, ImmutableList.<Path>of(fooFile, barFile), result); + client.upload(execRoot, ImmutableList.<Path>of(fooFile, barFile), outErr, result); ActionResult.Builder expectedResult = ActionResult.newBuilder(); expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest); expectedResult diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java index 2dc3afe01d..48618a60af 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/WatcherServer.java @@ -21,6 +21,7 @@ import static java.util.logging.Level.WARNING; import com.google.common.base.Throwables; import com.google.devtools.build.lib.remote.CacheNotFoundException; import com.google.devtools.build.lib.remote.Digests; +import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache; import com.google.devtools.build.lib.shell.AbnormalTerminationException; import com.google.devtools.build.lib.shell.Command; @@ -37,7 +38,6 @@ import com.google.devtools.remoteexecution.v1test.ExecuteResponse; import com.google.devtools.remoteexecution.v1test.Platform; import com.google.longrunning.Operation; import com.google.protobuf.Any; -import com.google.protobuf.ByteString; import com.google.protobuf.Duration; import com.google.protobuf.util.Durations; import com.google.rpc.Code; @@ -75,7 +75,6 @@ final class WatcherServer extends WatcherImplBase { // How long to wait for the uid command. private static final Duration uidTimeout = Durations.fromMicros(30); - private static final int MAX_BLOB_SIZE_FOR_INLINE = 10 * 1024; private static final int LOCAL_EXEC_ERROR = -1; private final Path workPath; @@ -195,25 +194,10 @@ final class WatcherServer extends WatcherImplBase { new File(pathString)); } - private void passOutErr(byte[] stdout, byte[] stderr, ActionResult.Builder result) - throws InterruptedException { - if (stdout.length <= MAX_BLOB_SIZE_FOR_INLINE) { - result.setStdoutRaw(ByteString.copyFrom(stdout)); - } else if (stdout.length > 0) { - result.setStdoutDigest(cache.uploadBlob(stdout)); - } - if (stderr.length <= MAX_BLOB_SIZE_FOR_INLINE) { - result.setStderrRaw(ByteString.copyFrom(stderr)); - } else if (stderr.length > 0) { - result.setStderrDigest(cache.uploadBlob(stderr)); - } - } - public ActionResult execute(Action action, Path execRoot) throws IOException, InterruptedException, IllegalArgumentException, CacheNotFoundException { - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - ActionResult.Builder result = ActionResult.newBuilder(); + ByteArrayOutputStream stdoutBuffer = new ByteArrayOutputStream(); + ByteArrayOutputStream stderrBuffer = new ByteArrayOutputStream(); com.google.devtools.remoteexecution.v1test.Command command = com.google.devtools.remoteexecution.v1test.Command.parseFrom( cache.downloadBlob(action.getCommandDigest())); @@ -241,7 +225,8 @@ final class WatcherServer extends WatcherImplBase { long startTime = System.currentTimeMillis(); CommandResult cmdResult = null; try { - cmdResult = cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdout, stderr, true); + cmdResult = + cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdoutBuffer, stderrBuffer, true); } catch (AbnormalTerminationException e) { cmdResult = e.getResult(); } catch (CommandException e) { @@ -271,11 +256,15 @@ final class WatcherServer extends WatcherImplBase { exitCode = cmdResult.getTerminationStatus().getRawExitCode(); } - passOutErr(stdout.toByteArray(), stderr.toByteArray(), result); - cache.uploadAllResults(execRoot, outputs, result); + ActionResult.Builder result = ActionResult.newBuilder(); + cache.upload(result, execRoot, outputs); + byte[] stdout = stdoutBuffer.toByteArray(); + byte[] stderr = stderrBuffer.toByteArray(); + cache.uploadOutErr(result, stdout, stderr); ActionResult finalResult = result.setExitCode(exitCode).build(); if (exitCode == 0) { - cache.setCachedActionResult(Digests.computeActionKey(action), finalResult); + ActionKey actionKey = Digests.computeActionKey(action); + cache.setCachedActionResult(actionKey, finalResult); } return finalResult; } |