diff options
34 files changed, 2029 insertions, 2517 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index 3a3c304d1e..d2e82efa60 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD @@ -23,8 +23,6 @@ java_library( "//src/main/java/com/google/devtools/build/lib/actions", "//src/main/java/com/google/devtools/build/lib/standalone", "//src/main/java/com/google/devtools/common/options", - "//src/main/protobuf:remote_protocol_java_grpc", - "//src/main/protobuf:remote_protocol_java_proto", "//third_party:apache_httpclient", "//third_party:apache_httpcore", "//third_party:auth", @@ -35,6 +33,13 @@ java_library( "//third_party:netty", "//third_party/grpc:grpc-jar", "//third_party/protobuf:protobuf_java", + "//third_party/protobuf:protobuf_java_util", + "@googleapis//:google_bytestream_bytestream_java_grpc", + "@googleapis//:google_bytestream_bytestream_java_proto", + "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc", + "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto", + "@googleapis//:google_longrunning_operations_java_proto", + "@googleapis//:google_rpc_error_details_java_proto", ], ) diff --git a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java b/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java index 5820e0bf95..cffc58fd4b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java +++ b/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java @@ -14,25 +14,25 @@ package com.google.devtools.build.lib.remote; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; +import com.google.devtools.remoteexecution.v1test.Digest; /** * An exception to indicate cache misses. * TODO(olaola): have a class of checked RemoteCacheExceptions. */ public final class CacheNotFoundException extends Exception { - private final ContentDigest missingDigest; + private final Digest missingDigest; - CacheNotFoundException(ContentDigest missingDigest) { + CacheNotFoundException(Digest missingDigest) { this.missingDigest = missingDigest; } - public ContentDigest getMissingDigest() { + public Digest getMissingDigest() { return missingDigest; } @Override public String toString() { - return "Missing digest: " + ContentDigests.toString(missingDigest); + return "Missing digest: " + missingDigest; } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java index 87da7c2405..62605e1c2d 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java @@ -13,28 +13,27 @@ // limitations under the License. package com.google.devtools.build.lib.remote; -import static java.nio.charset.StandardCharsets.UTF_8; - -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ExecException; import com.google.devtools.build.lib.actions.Spawn; +import com.google.devtools.build.lib.actions.Spawns; import com.google.devtools.build.lib.actions.UserExecException; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.exec.SpawnResult; import com.google.devtools.build.lib.exec.SpawnResult.Status; import com.google.devtools.build.lib.exec.SpawnRunner; -import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; -import com.google.devtools.build.lib.remote.RemoteProtocol.Action; -import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; -import com.google.devtools.build.lib.remote.RemoteProtocol.Command; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; -import com.google.devtools.build.lib.remote.RemoteProtocol.Platform; +import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; +import com.google.devtools.remoteexecution.v1test.Action; +import com.google.devtools.remoteexecution.v1test.ActionResult; +import com.google.devtools.remoteexecution.v1test.Command; +import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.devtools.remoteexecution.v1test.Platform; +import com.google.protobuf.Duration; import com.google.protobuf.TextFormat; import com.google.protobuf.TextFormat.ParseException; import io.grpc.StatusRuntimeException; @@ -56,14 +55,11 @@ final class CachedLocalSpawnRunner implements SpawnRunner { // TODO(olaola): This will be set on a per-action basis instead. private final Platform platform; - private final RemoteActionCache actionCache; + private final RemoteActionCache remoteCache; private final SpawnRunner delegate; CachedLocalSpawnRunner( - Path execRoot, - RemoteOptions options, - RemoteActionCache actionCache, - SpawnRunner delegate) { + Path execRoot, RemoteOptions options, RemoteActionCache remoteCache, SpawnRunner delegate) { this.execRoot = execRoot; this.options = options; if (options.experimentalRemotePlatformOverride != null) { @@ -77,15 +73,13 @@ final class CachedLocalSpawnRunner implements SpawnRunner { } else { platform = null; } - this.actionCache = actionCache; + this.remoteCache = remoteCache; this.delegate = delegate; } @Override - public SpawnResult exec( - Spawn spawn, - SpawnExecutionPolicy policy) - throws InterruptedException, IOException, ExecException { + public SpawnResult exec(Spawn spawn, SpawnExecutionPolicy policy) + throws InterruptedException, IOException, ExecException { ActionKey actionKey = null; String mnemonic = spawn.getMnemonic(); @@ -100,24 +94,26 @@ final class CachedLocalSpawnRunner implements SpawnRunner { Action action = buildAction( spawn.getOutputFiles(), - ContentDigests.computeDigest(command), - repository.getMerkleDigest(inputRoot)); + Digests.computeDigest(command), + repository.getMerkleDigest(inputRoot), + // TODO(olaola): set sensible local and remote timeouts. + Spawns.getTimeoutSeconds(spawn, 120)); // Look up action cache, and reuse the action output if it is found. - actionKey = ContentDigests.computeActionKey(action); + actionKey = Digests.computeActionKey(action); ActionResult result = - this.options.remoteAcceptCached ? actionCache.getCachedActionResult(actionKey) : null; + this.options.remoteAcceptCached ? remoteCache.getCachedActionResult(actionKey) : null; if (result != null) { // We don't cache failed actions, so we know the outputs exist. // For now, download all outputs locally; in the future, we can reuse the digests to // just update the TreeNodeRepository and continue the build. try { // TODO(ulfjack): Download stdout, stderr, and the output files in a single call. - actionCache.downloadAllResults(result, execRoot); + remoteCache.downloadAllResults(result, execRoot); passRemoteOutErr(result, policy.getFileOutErr()); return new SpawnResult.Builder() .setStatus(Status.SUCCESS) - .setExitCode(result.getReturnCode()) + .setExitCode(result.getExitCode()) .build(); } catch (CacheNotFoundException e) { // TODO(ulfjack): Track down who throws this exception in what cases and double-check that @@ -138,44 +134,62 @@ final class CachedLocalSpawnRunner implements SpawnRunner { } private Action buildAction( - Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) { + Collection<? extends ActionInput> outputs, + Digest command, + Digest inputRoot, + long timeoutSeconds) { Action.Builder action = Action.newBuilder(); action.setCommandDigest(command); action.setInputRootDigest(inputRoot); // Somewhat ugly: we rely on the stable order of outputs here for remote action caching. for (ActionInput output : outputs) { - action.addOutputPath(output.getExecPathString()); + // TODO: output directories should be handled here, when they are supported. + action.addOutputFiles(output.getExecPathString()); } if (platform != null) { action.setPlatform(platform); } + action.setTimeout(Duration.newBuilder().setSeconds(timeoutSeconds)); return action.build(); } - private static Command buildCommand( - List<String> arguments, ImmutableMap<String, String> environment) { + private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) { Command.Builder command = Command.newBuilder(); - command.addAllArgv(arguments); + command.addAllArguments(arguments); // Sorting the environment pairs by variable name. TreeSet<String> variables = new TreeSet<>(environment.keySet()); for (String var : variables) { - command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var)); + command.addEnvironmentVariablesBuilder().setName(var).setValue(environment.get(var)); } return command.build(); } - private void passRemoteOutErr( - ActionResult result, FileOutErr outErr) - throws CacheNotFoundException { - ImmutableList<byte[]> streams = - actionCache.downloadBlobs( - ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest())); - outErr.printOut(new String(streams.get(0), UTF_8)); - outErr.printErr(new String(streams.get(1), UTF_8)); + private void passRemoteOutErr(ActionResult result, FileOutErr outErr) throws IOException { + try { + if (!result.getStdoutRaw().isEmpty()) { + result.getStdoutRaw().writeTo(outErr.getOutputStream()); + outErr.getOutputStream().flush(); + } else if (result.hasStdoutDigest()) { + byte[] stdoutBytes = remoteCache.downloadBlob(result.getStdoutDigest()); + outErr.getOutputStream().write(stdoutBytes); + outErr.getOutputStream().flush(); + } + if (!result.getStderrRaw().isEmpty()) { + result.getStderrRaw().writeTo(outErr.getErrorStream()); + outErr.getErrorStream().flush(); + } else if (result.hasStderrDigest()) { + byte[] stderrBytes = remoteCache.downloadBlob(result.getStderrDigest()); + outErr.getErrorStream().write(stderrBytes); + outErr.getErrorStream().flush(); + } + } catch (CacheNotFoundException e) { + outErr.printOutLn("Failed to fetch remote stdout/err due to cache miss."); + outErr.getOutputStream().flush(); + } } private void writeCacheEntry(Spawn spawn, FileOutErr outErr, ActionKey actionKey) - throws IOException, InterruptedException { + throws IOException, InterruptedException { ArrayList<Path> outputFiles = new ArrayList<>(); for (ActionInput output : spawn.getOutputFiles()) { Path outputPath = execRoot.getRelative(output.getExecPathString()); @@ -186,11 +200,11 @@ final class CachedLocalSpawnRunner implements SpawnRunner { } } ActionResult.Builder result = ActionResult.newBuilder(); - actionCache.uploadAllResults(execRoot, outputFiles, result); - ContentDigest stderr = actionCache.uploadFileContents(outErr.getErrorPath()); - ContentDigest stdout = actionCache.uploadFileContents(outErr.getOutputPath()); + remoteCache.uploadAllResults(execRoot, outputFiles, result); + Digest stderr = remoteCache.uploadFileContents(outErr.getErrorPath()); + Digest stdout = remoteCache.uploadFileContents(outErr.getOutputPath()); result.setStderrDigest(stderr); result.setStdoutDigest(stdout); - actionCache.setCachedActionResult(actionKey, result.build()); + remoteCache.setCachedActionResult(actionKey, result.build()); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java b/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java index c2654586bf..a78458d135 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; +import com.google.devtools.common.options.Options; import io.grpc.CallCredentials; import io.grpc.auth.MoreCallCredentials; import io.grpc.netty.GrpcSslContexts; @@ -33,24 +34,21 @@ import javax.net.ssl.SSLException; /** Instantiate all authentication helpers from build options. */ @ThreadSafe public final class ChannelOptions { - private final int maxMessageSize; private final boolean tlsEnabled; private final SslContext sslContext; private final String tlsAuthorityOverride; private final CallCredentials credentials; - private static final int CHUNK_MESSAGE_OVERHEAD = 1024; + public static final ChannelOptions DEFAULT = create(Options.getDefaults(AuthAndTLSOptions.class)); private ChannelOptions( boolean tlsEnabled, SslContext sslContext, String tlsAuthorityOverride, - CallCredentials credentials, - int maxMessageSize) { + CallCredentials credentials) { this.tlsEnabled = tlsEnabled; this.sslContext = sslContext; this.tlsAuthorityOverride = tlsAuthorityOverride; this.credentials = credentials; - this.maxMessageSize = maxMessageSize; } public boolean tlsEnabled() { @@ -69,15 +67,10 @@ public final class ChannelOptions { return sslContext; } - public int maxMessageSize() { - return maxMessageSize; - } - - public static ChannelOptions create(AuthAndTLSOptions options, int grpcMaxChunkSizeBytes) { + public static ChannelOptions create(AuthAndTLSOptions options) { try { return create( options, - grpcMaxChunkSizeBytes, options.authCredentials != null ? new FileInputStream(options.authCredentials) : null); @@ -88,7 +81,8 @@ public final class ChannelOptions { } @VisibleForTesting - public static ChannelOptions create(AuthAndTLSOptions options, int grpcMaxChunkSizeBytes, + public static ChannelOptions create( + AuthAndTLSOptions options, @Nullable InputStream credentialsInputStream) { boolean tlsEnabled = options.tlsEnabled; SslContext sslContext = null; @@ -118,11 +112,7 @@ public final class ChannelOptions { "Failed initializing auth credentials for remote cache/execution " + e); } } - final int maxMessageSize = - Math.max( - 4 * 1024 * 1024 /* GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE */, - grpcMaxChunkSizeBytes + CHUNK_MESSAGE_OVERHEAD); return new ChannelOptions( - tlsEnabled, sslContext, tlsAuthorityOverride, credentials, maxMessageSize); + tlsEnabled, sslContext, tlsAuthorityOverride, credentials); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java index a9bdd6dc13..0c407a29b7 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java @@ -14,114 +14,162 @@ package com.google.devtools.build.lib.remote; -import com.google.common.collect.ImmutableSet; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputFileCache; import com.google.devtools.build.lib.actions.cache.VirtualActionInput; -import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.vfs.Path; -import com.google.protobuf.ByteString; +import com.google.devtools.remoteexecution.v1test.Digest; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; -import javax.annotation.Nullable; -/** An iterator-type object that transforms byte sources into a stream of BlobChunk messages. */ +/** An iterator-type object that transforms byte sources into a stream of Chunks. */ public final class Chunker { + // This is effectively final, should be changed only in unit-tests! + private static int DEFAULT_CHUNK_SIZE = 1024 * 16; + private static byte[] EMPTY_BLOB = new byte[0]; + + @VisibleForTesting + static void setDefaultChunkSizeForTesting(int value) { + DEFAULT_CHUNK_SIZE = value; + } + + public static int getDefaultChunkSize() { + return DEFAULT_CHUNK_SIZE; + } + + /** A piece of a byte[] blob. */ + public static final class Chunk { + + private final Digest digest; + private final long offset; + // TODO(olaola): consider saving data in a different format that byte[]. + private final byte[] data; + + @VisibleForTesting + public Chunk(Digest digest, byte[] data, long offset) { + this.digest = digest; + this.data = data; + this.offset = offset; + } + + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (!(o instanceof Chunk)) { + return false; + } + Chunk other = (Chunk) o; + return other.offset == offset + && other.digest.equals(digest) + && Arrays.equals(other.data, data); + } + + @Override + public int hashCode() { + return Objects.hash(digest, offset, Arrays.hashCode(data)); + } + + public Digest getDigest() { + return digest; + } + + public long getOffset() { + return offset; + } + + // This returns a mutable copy, for efficiency. + public byte[] getData() { + return data; + } + } + /** An Item is an opaque digestable source of bytes. */ interface Item { - ContentDigest getDigest() throws IOException; + Digest getDigest() throws IOException; InputStream getInputStream() throws IOException; } private final Iterator<Item> inputIterator; private InputStream currentStream; - private final Set<ContentDigest> digests; - private ContentDigest digest; + private Digest digest; private long bytesLeft; private final int chunkSize; - Chunker( - Iterator<Item> inputIterator, - int chunkSize, - // If present, specifies which digests to output out of the whole input. - @Nullable Set<ContentDigest> digests) - throws IOException { + Chunker(Iterator<Item> inputIterator, int chunkSize) throws IOException { Preconditions.checkArgument(chunkSize > 0, "Chunk size must be greater than 0"); - this.digests = digests; this.inputIterator = inputIterator; this.chunkSize = chunkSize; advanceInput(); } - Chunker(Iterator<Item> inputIterator, int chunkSize) throws IOException { - this(inputIterator, chunkSize, null); + Chunker(Item input, int chunkSize) throws IOException { + this(Iterators.singletonIterator(input), chunkSize); } - Chunker(Item input, int chunkSize) throws IOException { - this(Iterators.singletonIterator(input), chunkSize, ImmutableSet.of(input.getDigest())); - } - - private void advanceInput() throws IOException { - do { - if (inputIterator != null && inputIterator.hasNext()) { - Item input = inputIterator.next(); - digest = input.getDigest(); - currentStream = input.getInputStream(); - bytesLeft = digest.getSizeBytes(); - } else { - digest = null; - currentStream = null; - bytesLeft = 0; - } - } while (digest != null && digests != null && !digests.contains(digest)); + public void advanceInput() throws IOException { + if (inputIterator != null && inputIterator.hasNext()) { + Item input = inputIterator.next(); + digest = input.getDigest(); + currentStream = input.getInputStream(); + bytesLeft = digest.getSizeBytes(); + } else { + digest = null; + currentStream = null; + bytesLeft = 0; + } } - /** True if the object has more BlobChunk elements. */ + /** True if the object has more Chunk elements. */ public boolean hasNext() { return currentStream != null; } - /** Consume the next BlobChunk element. */ - public BlobChunk next() throws IOException { + /** Consume the next Chunk element. */ + public Chunk next() throws IOException { if (!hasNext()) { throw new NoSuchElementException(); } - BlobChunk.Builder chunk = BlobChunk.newBuilder(); - long offset = digest.getSizeBytes() - bytesLeft; - if (offset == 0) { - chunk.setDigest(digest); - } else { - chunk.setOffset(offset); - } + final long offset = digest.getSizeBytes() - bytesLeft; + byte[] blob = EMPTY_BLOB; if (bytesLeft > 0) { - byte[] blob = new byte[(int) Math.min(bytesLeft, chunkSize)]; + blob = new byte[(int) Math.min(bytesLeft, chunkSize)]; currentStream.read(blob); - chunk.setData(ByteString.copyFrom(blob)); bytesLeft -= blob.length; } + final Chunk result = new Chunk(digest, blob, offset); if (bytesLeft == 0) { currentStream.close(); advanceInput(); // Sets the current stream to null, if it was the last. } - return chunk.build(); + return result; } static Item toItem(final byte[] blob) { return new Item() { + Digest digest = null; + @Override - public ContentDigest getDigest() throws IOException { - return ContentDigests.computeDigest(blob); + public Digest getDigest() throws IOException { + if (digest == null) { + digest = Digests.computeDigest(blob); + } + return digest; } @Override @@ -133,9 +181,14 @@ public final class Chunker { static Item toItem(final Path file) { return new Item() { + Digest digest = null; + @Override - public ContentDigest getDigest() throws IOException { - return ContentDigests.computeDigest(file); + public Digest getDigest() throws IOException { + if (digest == null) { + digest = Digests.computeDigest(file); + } + return digest; } @Override @@ -152,8 +205,8 @@ public final class Chunker { } return new Item() { @Override - public ContentDigest getDigest() throws IOException { - return ContentDigests.getDigestFromInputCache(input, inputCache); + public Digest getDigest() throws IOException { + return Digests.getDigestFromInputCache(input, inputCache); } @Override @@ -165,9 +218,14 @@ public final class Chunker { static Item toItem(final VirtualActionInput input) { return new Item() { + Digest digest = null; + @Override - public ContentDigest getDigest() throws IOException { - return ContentDigests.computeDigest(input); + public Digest getDigest() throws IOException { + if (digest == null) { + digest = Digests.computeDigest(input); + } + return digest; } @Override @@ -189,27 +247,67 @@ public final class Chunker { return new Chunker(toItem(input, inputCache, execRoot), chunkSize); } + /** + * Create a Chunker from a given ActionInput, taking its digest from the provided + * ActionInputFileCache. + */ + public static Chunker from(ActionInput input, ActionInputFileCache inputCache, Path execRoot) + throws IOException { + return from(input, getDefaultChunkSize(), inputCache, execRoot); + } + /** Create a Chunker from a given blob and chunkSize. */ public static Chunker from(byte[] blob, int chunkSize) throws IOException { return new Chunker(toItem(blob), chunkSize); } + /** Create a Chunker from a given blob. */ + public static Chunker from(byte[] blob) throws IOException { + return from(blob, getDefaultChunkSize()); + } + /** Create a Chunker from a given Path and chunkSize. */ public static Chunker from(Path file, int chunkSize) throws IOException { return new Chunker(toItem(file), chunkSize); } + /** Create a Chunker from a given Path. */ + public static Chunker from(Path file) throws IOException { + return from(file, getDefaultChunkSize()); + } + + static class MemberOf implements Predicate<Item> { + private final Set<Digest> digests; + + public MemberOf(Set<Digest> digests) { + this.digests = digests; + } + + @Override + public boolean apply(Item item) { + try { + return digests.contains(item.getDigest()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + /** * Create a Chunker from multiple input sources. The order of the sources provided to the Builder * will be the same order they will be chunked by. */ public static final class Builder { private final ArrayList<Item> items = new ArrayList<>(); - private Set<ContentDigest> digests = null; - private int chunkSize = 0; + private Set<Digest> digests = null; + private int chunkSize = getDefaultChunkSize(); public Chunker build() throws IOException { - return new Chunker(items.iterator(), chunkSize, digests); + return new Chunker( + digests == null + ? items.iterator() + : Iterators.filter(items.iterator(), new MemberOf(digests)), + chunkSize); } public Builder chunkSize(int chunkSize) { @@ -221,7 +319,7 @@ public final class Chunker { * Restricts the Chunker to use only inputs with these digests. This is an optimization for CAS * uploads where a list of digests missing from the CAS is known. */ - public Builder onlyUseDigests(Set<ContentDigest> digests) { + public Builder onlyUseDigests(Set<Digest> digests) { this.digests = digests; return this; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/ContentDigests.java b/src/main/java/com/google/devtools/build/lib/remote/Digests.java index 21a739c468..9121bc2782 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ContentDigests.java +++ b/src/main/java/com/google/devtools/build/lib/remote/Digests.java @@ -14,34 +14,35 @@ package com.google.devtools.build.lib.remote; +import static java.nio.charset.StandardCharsets.UTF_8; + import com.google.common.hash.HashCode; import com.google.common.hash.Hashing; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputFileCache; import com.google.devtools.build.lib.actions.cache.VirtualActionInput; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import com.google.devtools.build.lib.remote.RemoteProtocol.Action; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; import com.google.devtools.build.lib.vfs.Path; -import com.google.protobuf.ByteString; +import com.google.devtools.remoteexecution.v1test.Action; +import com.google.devtools.remoteexecution.v1test.Digest; import com.google.protobuf.Message; import java.io.ByteArrayOutputStream; import java.io.IOException; -/** Helper methods relating to computing ContentDigest messages for remote execution. */ +/** Helper methods relating to computing Digest messages for remote execution. */ @ThreadSafe -public final class ContentDigests { - private ContentDigests() {} +public final class Digests { + private Digests() {} - public static ContentDigest computeDigest(byte[] blob) { - return buildDigest(Hashing.sha1().hashBytes(blob).asBytes(), blob.length); + public static Digest computeDigest(byte[] blob) { + return buildDigest(Hashing.sha1().hashBytes(blob).toString(), blob.length); } - public static ContentDigest computeDigest(Path file) throws IOException { + public static Digest computeDigest(Path file) throws IOException { return buildDigest(file.getSHA1Digest(), file.getFileSize()); } - public static ContentDigest computeDigest(VirtualActionInput input) throws IOException { + public static Digest computeDigest(VirtualActionInput input) throws IOException { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); input.writeTo(buffer); return computeDigest(buffer.toByteArray()); @@ -52,22 +53,26 @@ public final class ContentDigests { * bytes, but this implementation relies on the stability of the proto encoding, in particular * between different platforms and languages. TODO(olaola): upgrade to a better implementation! */ - public static ContentDigest computeDigest(Message message) { + public static Digest computeDigest(Message message) { return computeDigest(message.toByteArray()); } + public static Digest computeDigestUtf8(String str) { + return computeDigest(str.getBytes(UTF_8)); + } + /** - * A special type of ContentDigest that is used only as a remote action cache key. This is a - * separate type in order to prevent accidentally using other ContentDigests as action keys. + * A special type of Digest that is used only as a remote action cache key. This is a + * separate type in order to prevent accidentally using other Digests as action keys. */ public static final class ActionKey { - private final ContentDigest digest; + private final Digest digest; - public ContentDigest getDigest() { + public Digest getDigest() { return digest; } - private ActionKey(ContentDigest digest) { + private ActionKey(Digest digest) { this.digest = digest; } } @@ -77,29 +82,23 @@ public final class ContentDigests { } /** - * Assumes that the given ContentDigest is a valid digest of an Action, and creates an ActionKey + * Assumes that the given Digest is a valid digest of an Action, and creates an ActionKey * wrapper. This should not be called on the client side! */ - public static ActionKey unsafeActionKeyFromDigest(ContentDigest digest) { + public static ActionKey unsafeActionKeyFromDigest(Digest digest) { return new ActionKey(digest); } - public static ContentDigest buildDigest(byte[] digest, long size) { - ContentDigest.Builder b = ContentDigest.newBuilder(); - b.setDigest(ByteString.copyFrom(digest)).setSizeBytes(size); - return b.build(); + public static Digest buildDigest(byte[] hash, long size) { + return buildDigest(HashCode.fromBytes(hash).toString(), size); } - public static ContentDigest getDigestFromInputCache(ActionInput input, ActionInputFileCache cache) - throws IOException { - return buildDigest(cache.getDigest(input), cache.getSizeInBytes(input)); + public static Digest buildDigest(String hexHash, long size) { + return Digest.newBuilder().setHash(hexHash).setSizeBytes(size).build(); } - public static String toHexString(ContentDigest digest) { - return HashCode.fromBytes(digest.getDigest().toByteArray()).toString(); - } - - public static String toString(ContentDigest digest) { - return "<digest: " + toHexString(digest) + ", size: " + digest.getSizeBytes() + " bytes>"; + public static Digest getDigestFromInputCache(ActionInput input, ActionInputFileCache cache) + throws IOException { + return buildDigest(cache.getDigest(input), cache.getSizeInBytes(input)); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java index d85125f15c..d93842244d 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java @@ -14,52 +14,53 @@ package com.google.devtools.build.lib.remote; +import com.google.bytestream.ByteStreamGrpc; +import com.google.bytestream.ByteStreamGrpc.ByteStreamBlockingStub; +import com.google.bytestream.ByteStreamGrpc.ByteStreamStub; +import com.google.bytestream.ByteStreamProto.ReadRequest; +import com.google.bytestream.ByteStreamProto.ReadResponse; +import com.google.bytestream.ByteStreamProto.WriteRequest; +import com.google.bytestream.ByteStreamProto.WriteResponse; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputFileCache; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; -import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; -import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus; -import com.google.devtools.build.lib.remote.RemoteProtocol.FileMetadata; -import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode; -import com.google.devtools.build.lib.remote.RemoteProtocol.Output; -import com.google.devtools.build.lib.remote.RemoteProtocol.Output.ContentCase; +import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; -import com.google.devtools.build.lib.util.Pair; import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc; +import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheBlockingStub; +import com.google.devtools.remoteexecution.v1test.ActionResult; +import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsRequest; +import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsResponse; +import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc; +import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageBlockingStub; +import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.devtools.remoteexecution.v1test.Directory; +import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest; +import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse; +import com.google.devtools.remoteexecution.v1test.GetActionResultRequest; +import com.google.devtools.remoteexecution.v1test.OutputDirectory; +import com.google.devtools.remoteexecution.v1test.OutputFile; +import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest; import com.google.protobuf.ByteString; import io.grpc.Channel; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; -import java.util.Map; -import java.util.Set; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -67,50 +68,76 @@ import java.util.concurrent.atomic.AtomicReference; /** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */ @ThreadSafe public class GrpcActionCache implements RemoteActionCache { - private static final int MAX_MEMORY_KBYTES = 512 * 1024; - - /** Channel over which to send gRPC CAS queries. */ - private final GrpcCasInterface casIface; - - private final GrpcExecutionCacheInterface iface; private final RemoteOptions options; - - public GrpcActionCache( - RemoteOptions options, GrpcCasInterface casIface, GrpcExecutionCacheInterface iface) { - this.options = options; - this.casIface = casIface; - this.iface = iface; - } + private final ChannelOptions channelOptions; + private final Channel channel; @VisibleForTesting - public GrpcActionCache( - Channel channel, RemoteOptions options, ChannelOptions channelOptions) { + public GrpcActionCache(Channel channel, ChannelOptions channelOptions, RemoteOptions options) { this.options = options; - this.casIface = - GrpcInterfaces.casInterface(options.remoteTimeout, channel, channelOptions); - this.iface = - GrpcInterfaces.executionCacheInterface(options.remoteTimeout, channel, channelOptions); + this.channelOptions = channelOptions; + this.channel = channel; } - public GrpcActionCache(RemoteOptions options, ChannelOptions channelOptions) { - this(RemoteUtils.createChannel(options.remoteCache, channelOptions), options, channelOptions); - } + // All gRPC stubs are reused. + private final Supplier<ContentAddressableStorageBlockingStub> casBlockingStub = + Suppliers.memoize( + new Supplier<ContentAddressableStorageBlockingStub>() { + @Override + public ContentAddressableStorageBlockingStub get() { + return ContentAddressableStorageGrpc.newBlockingStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()) + .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); + } + }); + + private final Supplier<ByteStreamBlockingStub> bsBlockingStub = + Suppliers.memoize( + new Supplier<ByteStreamBlockingStub>() { + @Override + public ByteStreamBlockingStub get() { + return ByteStreamGrpc.newBlockingStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()) + .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); + } + }); + + private final Supplier<ByteStreamStub> bsStub = + Suppliers.memoize( + new Supplier<ByteStreamStub>() { + @Override + public ByteStreamStub get() { + return ByteStreamGrpc.newStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()) + .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); + } + }); + + private final Supplier<ActionCacheBlockingStub> acBlockingStub = + Suppliers.memoize( + new Supplier<ActionCacheBlockingStub>() { + @Override + public ActionCacheBlockingStub get() { + return ActionCacheGrpc.newBlockingStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()) + .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS); + } + }); public static boolean isRemoteCacheOptions(RemoteOptions options) { return options.remoteCache != null; } - private ImmutableSet<ContentDigest> getMissingDigests(Iterable<ContentDigest> digests) { - CasLookupRequest.Builder request = CasLookupRequest.newBuilder().addAllDigest(digests); - if (request.getDigestCount() == 0) { + private ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests) { + FindMissingBlobsRequest.Builder request = + FindMissingBlobsRequest.newBuilder() + .setInstanceName(options.remoteInstanceName) + .addAllBlobDigests(digests); + if (request.getBlobDigestsCount() == 0) { return ImmutableSet.of(); } - CasStatus status = casIface.lookup(request.build()).getStatus(); - if (!status.getSucceeded() && status.getError() != CasStatus.ErrorCode.MISSING_DIGEST) { - // TODO(olaola): here and below, add basic retry logic on transient errors! - throw new RuntimeException(status.getErrorDetail()); - } - return ImmutableSet.copyOf(status.getMissingDigestList()); + FindMissingBlobsResponse response = casBlockingStub.get().findMissingBlobs(request.build()); + return ImmutableSet.copyOf(response.getMissingBlobDigestsList()); } /** @@ -122,26 +149,37 @@ public class GrpcActionCache implements RemoteActionCache { throws IOException, InterruptedException { repository.computeMerkleDigests(root); // TODO(olaola): avoid querying all the digests, only ask for novel subtrees. - ImmutableSet<ContentDigest> missingDigests = getMissingDigests(repository.getAllDigests(root)); + ImmutableSet<Digest> missingDigests = getMissingDigests(repository.getAllDigests(root)); // Only upload data that was missing from the cache. ArrayList<ActionInput> actionInputs = new ArrayList<>(); - ArrayList<FileNode> treeNodes = new ArrayList<>(); + ArrayList<Directory> treeNodes = new ArrayList<>(); repository.getDataFromDigests(missingDigests, actionInputs, treeNodes); if (!treeNodes.isEmpty()) { - CasUploadTreeMetadataRequest.Builder metaRequest = - CasUploadTreeMetadataRequest.newBuilder().addAllTreeNode(treeNodes); - CasUploadTreeMetadataReply reply = casIface.uploadTreeMetadata(metaRequest.build()); - if (!reply.getStatus().getSucceeded()) { - throw new RuntimeException(reply.getStatus().getErrorDetail()); + // TODO(olaola): split this into multiple requests if total size is > 10MB. + BatchUpdateBlobsRequest.Builder treeBlobRequest = + BatchUpdateBlobsRequest.newBuilder().setInstanceName(options.remoteInstanceName); + for (Directory d : treeNodes) { + final byte[] data = d.toByteArray(); + treeBlobRequest + .addRequestsBuilder() + .setContentDigest(Digests.computeDigest(data)) + .setData(ByteString.copyFrom(data)); + } + BatchUpdateBlobsResponse response = + casBlockingStub.get().batchUpdateBlobs(treeBlobRequest.build()); + // TODO(olaola): handle retries on transient errors. + for (BatchUpdateBlobsResponse.Response r : response.getResponsesList()) { + if (!Status.fromCodeValue(r.getStatus().getCode()).isOk()) { + throw StatusProto.toStatusRuntimeException(r.getStatus()); + } } } if (!actionInputs.isEmpty()) { uploadChunks( actionInputs.size(), new Chunker.Builder() - .chunkSize(options.grpcMaxChunkSizeBytes) .addAllInputs(actionInputs, repository.getInputFileCache(), execRoot) .onlyUseDigests(missingDigests) .build()); @@ -152,21 +190,11 @@ public class GrpcActionCache implements RemoteActionCache { * Download the entire tree data rooted by the given digest and write it into the given location. */ @Override - public void downloadTree(ContentDigest rootDigest, Path rootLocation) + public void downloadTree(Digest rootDigest, Path rootLocation) throws IOException, CacheNotFoundException { throw new UnsupportedOperationException(); } - private void handleDownloadStatus(CasStatus status) throws CacheNotFoundException { - if (!status.getSucceeded()) { - if (status.getError() == CasStatus.ErrorCode.MISSING_DIGEST) { - throw new CacheNotFoundException(status.getMissingDigest(0)); - } - // TODO(olaola): deal with other statuses better. - throw new RuntimeException(status.getErrorDetail()); - } - } - /** * Download all results of a remotely executed action locally. TODO(olaola): will need to amend to * include the {@link com.google.devtools.build.lib.remote.TreeNodeRepository} for updating. @@ -174,106 +202,85 @@ public class GrpcActionCache implements RemoteActionCache { @Override public void downloadAllResults(ActionResult result, Path execRoot) throws IOException, CacheNotFoundException { - if (result.getOutputList().isEmpty()) { + if (result.getOutputFilesList().isEmpty() && result.getOutputDirectoriesList().isEmpty()) { return; } - // Send all the file requests in a single synchronous batch. - // TODO(olaola): profile to maybe replace with separate concurrent requests. - CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder(); - Map<ContentDigest, Pair<Path, FileMetadata>> metadataMap = new HashMap<>(); - for (Output output : result.getOutputList()) { - Path path = execRoot.getRelative(output.getPath()); - if (output.getContentCase() == ContentCase.FILE_METADATA) { - FileMetadata fileMetadata = output.getFileMetadata(); - ContentDigest digest = fileMetadata.getDigest(); - if (digest.getSizeBytes() > 0) { - request.addDigest(digest); - metadataMap.put(digest, Pair.of(path, fileMetadata)); - } else { - // Handle empty file locally. - FileSystemUtils.createDirectoryAndParents(path.getParentDirectory()); - FileSystemUtils.writeContent(path, new byte[0]); - } + for (OutputFile file : result.getOutputFilesList()) { + Path path = execRoot.getRelative(file.getPath()); + FileSystemUtils.createDirectoryAndParents(path.getParentDirectory()); + Digest digest = file.getDigest(); + if (digest.getSizeBytes() == 0) { + // Handle empty file locally. + FileSystemUtils.writeContent(path, new byte[0]); } else { - downloadTree(output.getDigest(), path); + try (OutputStream stream = path.getOutputStream()) { + if (!file.getContent().isEmpty()) { + file.getContent().writeTo(stream); + } else { + Iterator<ReadResponse> replies = readBlob(digest); + while (replies.hasNext()) { + replies.next().getData().writeTo(stream); + } + } + } } + path.setExecutable(file.getIsExecutable()); } - Iterator<CasDownloadReply> replies = casIface.downloadBlob(request.build()); - Set<ContentDigest> results = new HashSet<>(); - while (replies.hasNext()) { - results.add(createFileFromStream(metadataMap, replies)); - } - for (ContentDigest digest : metadataMap.keySet()) { - if (!results.contains(digest)) { - throw new CacheNotFoundException(digest); - } + for (OutputDirectory directory : result.getOutputDirectoriesList()) { + downloadTree(directory.getDigest(), execRoot.getRelative(directory.getPath())); } } - private ContentDigest createFileFromStream( - Map<ContentDigest, Pair<Path, FileMetadata>> metadataMap, Iterator<CasDownloadReply> replies) - throws IOException, CacheNotFoundException { - Preconditions.checkArgument(replies.hasNext()); - CasDownloadReply reply = replies.next(); - if (reply.hasStatus()) { - handleDownloadStatus(reply.getStatus()); + private Iterator<ReadResponse> readBlob(Digest digest) throws CacheNotFoundException { + String resourceName = ""; + if (!options.remoteInstanceName.isEmpty()) { + resourceName += options.remoteInstanceName + "/"; } - BlobChunk chunk = reply.getData(); - ContentDigest digest = chunk.getDigest(); - Preconditions.checkArgument(metadataMap.containsKey(digest)); - Pair<Path, FileMetadata> metadata = metadataMap.get(digest); - Path path = metadata.first; - FileSystemUtils.createDirectoryAndParents(path.getParentDirectory()); - try (OutputStream stream = path.getOutputStream()) { - ByteString data = chunk.getData(); - data.writeTo(stream); - long bytesLeft = digest.getSizeBytes() - data.size(); - while (bytesLeft > 0) { - Preconditions.checkArgument(replies.hasNext()); - reply = replies.next(); - if (reply.hasStatus()) { - handleDownloadStatus(reply.getStatus()); - } - chunk = reply.getData(); - data = chunk.getData(); - Preconditions.checkArgument(!chunk.hasDigest()); - Preconditions.checkArgument(chunk.getOffset() == digest.getSizeBytes() - bytesLeft); - data.writeTo(stream); - bytesLeft -= data.size(); + resourceName += "blobs/" + digest.getHash() + "/" + digest.getSizeBytes(); + try { + return bsBlockingStub + .get() + .read(ReadRequest.newBuilder().setResourceName(resourceName).build()); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() == Status.Code.NOT_FOUND) { + throw new CacheNotFoundException(digest); } - path.setExecutable(metadata.second.getExecutable()); + throw e; } - return digest; } /** Upload all results of a locally executed action to the cache. */ @Override public void uploadAllResults(Path execRoot, Collection<Path> files, ActionResult.Builder result) throws IOException, InterruptedException { - ArrayList<ContentDigest> digests = new ArrayList<>(); - Chunker.Builder b = new Chunker.Builder().chunkSize(options.grpcMaxChunkSizeBytes); + ArrayList<Digest> digests = new ArrayList<>(); + Chunker.Builder b = new Chunker.Builder(); for (Path file : files) { - digests.add(ContentDigests.computeDigest(file)); + if (!file.exists()) { + // We ignore requested results that have not been generated by the action. + continue; + } + if (file.isDirectory()) { + // TODO(olaola): to implement this for a directory, will need to create or pass a + // TreeNodeRepository to call uploadTree. + throw new UnsupportedOperationException("Storing a directory is not yet supported."); + } + digests.add(Digests.computeDigest(file)); b.addInput(file); } - ImmutableSet<ContentDigest> missing = getMissingDigests(digests); + ImmutableSet<Digest> missing = getMissingDigests(digests); if (!missing.isEmpty()) { uploadChunks(missing.size(), b.onlyUseDigests(missing).build()); } int index = 0; for (Path file : files) { - if (file.isDirectory()) { - // TODO(olaola): to implement this for a directory, will need to create or pass a - // TreeNodeRepository to call uploadTree. - throw new UnsupportedOperationException("Storing a directory is not yet supported."); - } // Add to protobuf. + // TODO(olaola): inline small results here. result - .addOutputBuilder() + .addOutputFilesBuilder() .setPath(file.relativeTo(execRoot).getPathString()) - .getFileMetadataBuilder() .setDigest(digests.get(index++)) - .setExecutable(file.isExecutable()); + .setIsExecutable(file.isExecutable()); } } @@ -284,11 +291,11 @@ public class GrpcActionCache implements RemoteActionCache { * @return The key for fetching the file contents blob from cache. */ @Override - public ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException { - ContentDigest digest = ContentDigests.computeDigest(file); - ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest)); + public Digest uploadFileContents(Path file) throws IOException, InterruptedException { + Digest digest = Digests.computeDigest(file); + ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest)); if (!missing.isEmpty()) { - uploadChunks(1, Chunker.from(file, options.grpcMaxChunkSizeBytes)); + uploadChunks(1, Chunker.from(file)); } return digest; } @@ -300,97 +307,84 @@ public class GrpcActionCache implements RemoteActionCache { * @return The key for fetching the file contents blob from cache. */ @Override - public ContentDigest uploadFileContents( + public Digest uploadFileContents( ActionInput input, Path execRoot, ActionInputFileCache inputCache) throws IOException, InterruptedException { - ContentDigest digest = ContentDigests.getDigestFromInputCache(input, inputCache); - ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest)); + Digest digest = Digests.getDigestFromInputCache(input, inputCache); + ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest)); if (!missing.isEmpty()) { - uploadChunks(1, Chunker.from(input, options.grpcMaxChunkSizeBytes, inputCache, execRoot)); + uploadChunks(1, Chunker.from(input, inputCache, execRoot)); } return digest; } - static class UploadBlobReplyStreamObserver implements StreamObserver<CasUploadBlobReply> { - private final CountDownLatch finishLatch; - private final AtomicReference<RuntimeException> exception; - - public UploadBlobReplyStreamObserver( - CountDownLatch finishLatch, AtomicReference<RuntimeException> exception) { - this.finishLatch = finishLatch; - this.exception = exception; - } - - @Override - public void onNext(CasUploadBlobReply reply) { - if (!reply.getStatus().getSucceeded()) { - // TODO(olaola): add basic retry logic on transient errors! - this.exception.compareAndSet( - null, new RuntimeException(reply.getStatus().getErrorDetail())); + private void uploadChunks(int numItems, Chunker chunker) + throws InterruptedException, IOException { + final CountDownLatch finishLatch = new CountDownLatch(numItems); + final AtomicReference<RuntimeException> exception = new AtomicReference<>(null); + StreamObserver<WriteRequest> requestObserver = null; + String resourceName = ""; + if (!options.remoteInstanceName.isEmpty()) { + resourceName += options.remoteInstanceName + "/"; + } + while (chunker.hasNext()) { + Chunker.Chunk chunk = chunker.next(); + final Digest digest = chunk.getDigest(); + long offset = chunk.getOffset(); + WriteRequest.Builder request = WriteRequest.newBuilder(); + if (offset == 0) { // Beginning of new upload. + numItems--; + request.setResourceName( + resourceName + + "uploads/" + + UUID.randomUUID() + + "/blobs/" + + digest.getHash() + + "/" + + digest.getSizeBytes()); + // The batches execute simultaneously. + requestObserver = + bsStub + .get() + .write( + new StreamObserver<WriteResponse>() { + private long bytesLeft = digest.getSizeBytes(); + + @Override + public void onNext(WriteResponse reply) { + bytesLeft -= reply.getCommittedSize(); + } + + @Override + public void onError(Throwable t) { + exception.compareAndSet( + null, new StatusRuntimeException(Status.fromThrowable(t))); + finishLatch.countDown(); + } + + @Override + public void onCompleted() { + if (bytesLeft != 0) { + exception.compareAndSet( + null, new RuntimeException("Server did not commit all data.")); + } + finishLatch.countDown(); + } + }); } - } - - @Override - public void onError(Throwable t) { - this.exception.compareAndSet(null, new StatusRuntimeException(Status.fromThrowable(t))); - finishLatch.countDown(); - } - - @Override - public void onCompleted() { - finishLatch.countDown(); - } - } - - private void uploadChunks(int numItems, Chunker blobs) throws InterruptedException, IOException { - CountDownLatch finishLatch = new CountDownLatch(numItems); // Maximal number of batches. - AtomicReference<RuntimeException> exception = new AtomicReference<>(null); - UploadBlobReplyStreamObserver responseObserver = null; - StreamObserver<CasUploadBlobRequest> requestObserver = null; - int currentBatchBytes = 0; - int batchedInputs = 0; - int batches = 0; - try { - while (blobs.hasNext()) { - BlobChunk chunk = blobs.next(); - if (chunk.hasDigest()) { - // Determine whether to start next batch. - final long batchSize = chunk.getDigest().getSizeBytes() + currentBatchBytes; - if (batchedInputs % options.grpcMaxBatchInputs == 0 - || batchSize > options.grpcMaxBatchSizeBytes) { - // The batches execute simultaneously. - if (requestObserver != null) { - batchedInputs = 0; - currentBatchBytes = 0; - requestObserver.onCompleted(); - } - batches++; - responseObserver = new UploadBlobReplyStreamObserver(finishLatch, exception); - requestObserver = casIface.uploadBlobAsync(responseObserver); - } - batchedInputs++; - } - currentBatchBytes += chunk.getData().size(); - requestObserver.onNext(CasUploadBlobRequest.newBuilder().setData(chunk).build()); - if (finishLatch.getCount() == 0) { - // RPC completed or errored before we finished sending. - throw new RuntimeException( - "gRPC terminated prematurely: " - + (exception.get() != null ? exception.get() : "unknown cause")); - } + byte[] data = chunk.getData(); + boolean finishWrite = offset + data.length == digest.getSizeBytes(); + request.setData(ByteString.copyFrom(data)).setWriteOffset(offset).setFinishWrite(finishWrite); + requestObserver.onNext(request.build()); + if (finishWrite) { + requestObserver.onCompleted(); } - } catch (RuntimeException e) { - // Cancel RPC - if (requestObserver != null) { - requestObserver.onError(e); + if (finishLatch.getCount() <= numItems) { + // Current RPC errored before we finished sending. + if (!finishWrite) { + chunker.advanceInput(); + } } - throw e; - } - if (requestObserver != null) { - requestObserver.onCompleted(); // Finish last batch. - } - while (batches++ < numItems) { - finishLatch.countDown(); // Non-sent batches. } finishLatch.await(options.remoteTimeout, TimeUnit.SECONDS); if (exception.get() != null) { @@ -399,33 +393,12 @@ public class GrpcActionCache implements RemoteActionCache { } @Override - public ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs) - throws InterruptedException { - ArrayList<ContentDigest> digests = new ArrayList<>(); - Chunker.Builder b = new Chunker.Builder().chunkSize(options.grpcMaxChunkSizeBytes); - for (byte[] blob : blobs) { - digests.add(ContentDigests.computeDigest(blob)); - b.addInput(blob); - } - ImmutableSet<ContentDigest> missing = getMissingDigests(digests); + public Digest uploadBlob(byte[] blob) throws InterruptedException { + Digest digest = Digests.computeDigest(blob); + ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest)); try { if (!missing.isEmpty()) { - uploadChunks(missing.size(), b.onlyUseDigests(missing).build()); - } - return ImmutableList.copyOf(digests); - } catch (IOException e) { - // This will never happen. - throw new RuntimeException(e); - } - } - - @Override - public ContentDigest uploadBlob(byte[] blob) throws InterruptedException { - ContentDigest digest = ContentDigests.computeDigest(blob); - ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest)); - try { - if (!missing.isEmpty()) { - uploadChunks(1, Chunker.from(blob, options.grpcMaxChunkSizeBytes)); + uploadChunks(1, Chunker.from(blob)); } return digest; } catch (IOException e) { @@ -435,69 +408,20 @@ public class GrpcActionCache implements RemoteActionCache { } @Override - public byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException { - return downloadBlobs(ImmutableList.of(digest)).get(0); - } - - @Override - public ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests) - throws CacheNotFoundException { - // Send all the file requests in a single synchronous batch. - // TODO(olaola): profile to maybe replace with separate concurrent requests. - CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder(); - for (ContentDigest digest : digests) { - if (digest.getSizeBytes() > 0) { - request.addDigest(digest); // We handle empty blobs locally. - } + public byte[] downloadBlob(Digest digest) throws CacheNotFoundException { + if (digest.getSizeBytes() == 0) { + return new byte[0]; } - Iterator<CasDownloadReply> replies = null; - Map<ContentDigest, byte[]> results = new HashMap<>(); - int digestCount = request.getDigestCount(); - if (digestCount > 0) { - replies = casIface.downloadBlob(request.build()); - while (digestCount-- > 0) { - Preconditions.checkArgument(replies.hasNext()); - CasDownloadReply reply = replies.next(); - if (reply.hasStatus()) { - handleDownloadStatus(reply.getStatus()); - } - BlobChunk chunk = reply.getData(); - ContentDigest digest = chunk.getDigest(); - // This is not enough, but better than nothing. - Preconditions.checkArgument(digest.getSizeBytes() / 1000.0 < MAX_MEMORY_KBYTES); - byte[] result = new byte[(int) digest.getSizeBytes()]; - ByteString data = chunk.getData(); - data.copyTo(result, 0); - int offset = data.size(); - while (offset < result.length) { - Preconditions.checkArgument(replies.hasNext()); - reply = replies.next(); - if (reply.hasStatus()) { - handleDownloadStatus(reply.getStatus()); - } - chunk = reply.getData(); - Preconditions.checkArgument(!chunk.hasDigest()); - Preconditions.checkArgument(chunk.getOffset() == offset); - data = chunk.getData(); - data.copyTo(result, offset); - offset += data.size(); - } - results.put(digest, result); - } - } - - ArrayList<byte[]> result = new ArrayList<>(); - for (ContentDigest digest : digests) { - if (digest.getSizeBytes() == 0) { - result.add(new byte[0]); - continue; - } - if (!results.containsKey(digest)) { - throw new CacheNotFoundException(digest); - } - result.add(results.get(digest)); + Iterator<ReadResponse> replies = readBlob(digest); + byte[] result = new byte[(int) digest.getSizeBytes()]; + int offset = 0; + while (replies.hasNext()) { + ByteString data = replies.next().getData(); + data.copyTo(result, offset); + offset += data.size(); } - return ImmutableList.copyOf(result); + Preconditions.checkState(digest.getSizeBytes() == offset); + return result; } // Execution Cache API @@ -505,30 +429,39 @@ public class GrpcActionCache implements RemoteActionCache { /** Returns a cached result for a given Action digest, or null if not found in cache. */ @Override public ActionResult getCachedActionResult(ActionKey actionKey) { - ExecutionCacheRequest request = - ExecutionCacheRequest.newBuilder().setActionDigest(actionKey.getDigest()).build(); - ExecutionCacheReply reply = iface.getCachedResult(request); - ExecutionCacheStatus status = reply.getStatus(); - if (!status.getSucceeded() - && status.getError() != ExecutionCacheStatus.ErrorCode.MISSING_RESULT) { - throw new RuntimeException(status.getErrorDetail()); + try { + return acBlockingStub + .get() + .getActionResult( + GetActionResultRequest.newBuilder() + .setInstanceName(options.remoteInstanceName) + .setActionDigest(actionKey.getDigest()) + .build()); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() == Status.Code.NOT_FOUND) { + return null; + } + throw e; } - return reply.hasResult() ? reply.getResult() : null; } /** Sets the given result as result of the given Action. */ @Override public void setCachedActionResult(ActionKey actionKey, ActionResult result) throws InterruptedException { - ExecutionCacheSetRequest request = - ExecutionCacheSetRequest.newBuilder() - .setActionDigest(actionKey.getDigest()) - .setResult(result) - .build(); - ExecutionCacheSetReply reply = iface.setCachedResult(request); - ExecutionCacheStatus status = reply.getStatus(); - if (!status.getSucceeded() && status.getError() != ExecutionCacheStatus.ErrorCode.UNSUPPORTED) { - throw new RuntimeException(status.getErrorDetail()); + try { + acBlockingStub + .get() + .updateActionResult( + UpdateActionResultRequest.newBuilder() + .setInstanceName(options.remoteInstanceName) + .setActionDigest(actionKey.getDigest()) + .setActionResult(result) + .build()); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() != Status.Code.UNIMPLEMENTED) { + throw e; + } } } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java deleted file mode 100644 index 529ff9c3db..0000000000 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2017 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.remote; - -import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub; -import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest; -import io.grpc.stub.StreamObserver; -import java.util.Iterator; - -/** - * An abstraction layer between the remote execution client and gRPC to support unit testing. This - * interface covers the CAS RPC methods, see {@link CasServiceBlockingStub} and - * {@link CasServiceStub}. - */ -public interface GrpcCasInterface { - CasLookupReply lookup(CasLookupRequest request); - CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request); - CasDownloadTreeMetadataReply downloadTreeMetadata(CasDownloadTreeMetadataRequest request); - Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request); - StreamObserver<CasUploadBlobRequest> uploadBlobAsync( - StreamObserver<CasUploadBlobReply> responseObserver); -} diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java deleted file mode 100644 index 375c2ab359..0000000000 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2017 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.remote; - -import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest; - -/** - * An abstraction layer between the remote execution client and gRPC to support unit testing. This - * interface covers the execution cache RPC methods, see {@link ExecutionCacheServiceBlockingStub}. - */ -public interface GrpcExecutionCacheInterface { - ExecutionCacheReply getCachedResult(ExecutionCacheRequest request); - ExecutionCacheSetReply setCachedResult(ExecutionCacheSetRequest request); -} diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java deleted file mode 100644 index fd44792b4a..0000000000 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright 2017 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.remote; - -import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; -import java.util.Iterator; - -/** - * An abstraction layer between the remote execution client and gRPC to support unit testing. This - * interface covers the remote execution RPC methods, see {@link ExecuteServiceBlockingStub}. - */ -public interface GrpcExecutionInterface { - Iterator<ExecuteReply> execute(ExecuteRequest request); -} diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java deleted file mode 100644 index 6e100ae1d0..0000000000 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java +++ /dev/null @@ -1,131 +0,0 @@ -// Copyright 2017 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.remote; - -import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub; -import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub; -import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub; -import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest; -import io.grpc.Channel; -import io.grpc.stub.StreamObserver; -import java.util.Iterator; -import java.util.concurrent.TimeUnit; - -/** Implementations of the gRPC interfaces that actually talk to gRPC. */ -public class GrpcInterfaces { - /** Create a {@link GrpcCasInterface} instance that actually talks to gRPC. */ - public static GrpcCasInterface casInterface( - final int grpcTimeoutSeconds, - final Channel channel, - final ChannelOptions channelOptions) { - return new GrpcCasInterface() { - private CasServiceBlockingStub getCasServiceBlockingStub() { - return CasServiceGrpc.newBlockingStub(channel) - .withCallCredentials(channelOptions.getCallCredentials()) - .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS); - } - - private CasServiceStub getCasServiceStub() { - return CasServiceGrpc.newStub(channel) - .withCallCredentials(channelOptions.getCallCredentials()) - .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS); - } - - @Override - public CasLookupReply lookup(CasLookupRequest request) { - return getCasServiceBlockingStub().lookup(request); - } - - @Override - public CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request) { - return getCasServiceBlockingStub().uploadTreeMetadata(request); - } - - @Override - public CasDownloadTreeMetadataReply downloadTreeMetadata( - CasDownloadTreeMetadataRequest request) { - return getCasServiceBlockingStub().downloadTreeMetadata(request); - } - - @Override - public Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request) { - return getCasServiceBlockingStub().downloadBlob(request); - } - - @Override - public StreamObserver<CasUploadBlobRequest> uploadBlobAsync( - StreamObserver<CasUploadBlobReply> responseObserver) { - return getCasServiceStub().uploadBlob(responseObserver); - } - }; - } - - /** Create a {@link GrpcCasInterface} instance that actually talks to gRPC. */ - public static GrpcExecutionCacheInterface executionCacheInterface( - final int grpcTimeoutSeconds, - final Channel channel, - final ChannelOptions channelOptions) { - return new GrpcExecutionCacheInterface() { - private ExecutionCacheServiceBlockingStub getExecutionCacheServiceBlockingStub() { - return ExecutionCacheServiceGrpc.newBlockingStub(channel) - .withCallCredentials(channelOptions.getCallCredentials()) - .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS); - } - - @Override - public ExecutionCacheReply getCachedResult(ExecutionCacheRequest request) { - return getExecutionCacheServiceBlockingStub().getCachedResult(request); - } - - @Override - public ExecutionCacheSetReply setCachedResult(ExecutionCacheSetRequest request) { - return getExecutionCacheServiceBlockingStub().setCachedResult(request); - } - }; - } - - /** Create a {@link GrpcExecutionInterface} instance that actually talks to gRPC. */ - public static GrpcExecutionInterface executionInterface( - final int grpcTimeoutSeconds, - final Channel channel, - final ChannelOptions channelOptions) { - return new GrpcExecutionInterface() { - @Override - public Iterator<ExecuteReply> execute(ExecuteRequest request) { - ExecuteServiceBlockingStub stub = - ExecuteServiceGrpc.newBlockingStub(channel) - .withCallCredentials(channelOptions.getCallCredentials()) - .withDeadlineAfter( - grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS); - return stub.execute(request); - } - }; - } -} diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java index eb79c91919..1950a724c3 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java @@ -15,58 +15,54 @@ package com.google.devtools.build.lib.remote; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus; -import io.grpc.ManagedChannel; -import java.util.Iterator; +import com.google.devtools.build.lib.util.Preconditions; +import com.google.devtools.remoteexecution.v1test.ExecuteRequest; +import com.google.devtools.remoteexecution.v1test.ExecuteResponse; +import com.google.devtools.remoteexecution.v1test.ExecutionGrpc; +import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionBlockingStub; +import com.google.longrunning.Operation; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.Durations; +import io.grpc.Channel; +import io.grpc.protobuf.StatusProto; +import java.util.concurrent.TimeUnit; /** A remote work executor that uses gRPC for communicating the work, inputs and outputs. */ @ThreadSafe -public class GrpcRemoteExecutor extends GrpcActionCache { +public class GrpcRemoteExecutor { + private final RemoteOptions options; + private final ChannelOptions channelOptions; + private final Channel channel; + public static boolean isRemoteExecutionOptions(RemoteOptions options) { return options.remoteExecutor != null; } - private final GrpcExecutionInterface executionIface; - - public GrpcRemoteExecutor( - RemoteOptions options, - GrpcCasInterface casIface, - GrpcExecutionCacheInterface cacheIface, - GrpcExecutionInterface executionIface) { - super(options, casIface, cacheIface); - this.executionIface = executionIface; - } - - public GrpcRemoteExecutor( - ManagedChannel channel, ChannelOptions channelOptions, RemoteOptions options) { - super( - options, - GrpcInterfaces.casInterface(options.remoteTimeout, channel, channelOptions), - GrpcInterfaces.executionCacheInterface( - options.remoteTimeout, channel, channelOptions)); - this.executionIface = - GrpcInterfaces.executionInterface(options.remoteTimeout, channel, channelOptions); + public GrpcRemoteExecutor(Channel channel, ChannelOptions channelOptions, RemoteOptions options) { + this.options = options; + this.channelOptions = channelOptions; + this.channel = channel; } - public ExecuteReply executeRemotely(ExecuteRequest request) { - Iterator<ExecuteReply> replies = executionIface.execute(request); - ExecuteReply reply = null; - while (replies.hasNext()) { - reply = replies.next(); - // We can handle the action execution progress here. + public ExecuteResponse executeRemotely(ExecuteRequest request) { + // TODO(olaola): handle longrunning Operations by using the Watcher API to wait for results. + // For now, only support actions with wait_for_completion = true. + Preconditions.checkArgument(request.getWaitForCompletion()); + int actionSeconds = (int) Durations.toSeconds(request.getAction().getTimeout()); + ExecutionBlockingStub stub = + ExecutionGrpc.newBlockingStub(channel) + .withCallCredentials(channelOptions.getCallCredentials()) + .withDeadlineAfter(options.remoteTimeout + actionSeconds, TimeUnit.SECONDS); + Operation op = stub.execute(request); + Preconditions.checkState(op.getDone()); + Preconditions.checkState(op.getResultCase() != Operation.ResultCase.RESULT_NOT_SET); + if (op.getResultCase() == Operation.ResultCase.ERROR) { + throw StatusProto.toStatusRuntimeException(op.getError()); } - if (reply == null) { - return ExecuteReply.newBuilder() - .setStatus( - ExecutionStatus.newBuilder() - .setExecuted(false) - .setSucceeded(false) - .setError(ExecutionStatus.ErrorCode.UNKNOWN_ERROR) - .setErrorDetail("Remote server terminated the connection")) - .build(); + try { + return op.getResponse().unpack(ExecuteResponse.class); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); } - return reply; } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/README.md b/src/main/java/com/google/devtools/build/lib/remote/README.md index 4c85707e71..2525240b93 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/README.md +++ b/src/main/java/com/google/devtools/build/lib/remote/README.md @@ -13,7 +13,7 @@ When it's configured to use a remote cache, Bazel computes a hash for each actio Starting at version 0.5.0, Bazel supports two caching protocols: 1. a HTTP-based REST protocol -2. [a gRPC-based protocol](https://github.com/bazelbuild/bazel/blob/master/src/main/protobuf/remote_protocol.proto) +2. [a gRPC-based protocol](https://github.com/googleapis/googleapis/blob/master/google/devtools/remoteexecution/v1test/remote_execution.proto) ## Remote caching using the HTTP REST protocol @@ -101,7 +101,7 @@ this directory to include security control. ## Remote caching using the gRPC protocol -We're working on a [gRPC protocol](https://github.com/bazelbuild/bazel/blob/master/src/main/protobuf/remote_protocol.proto) +We're working on a [gRPC protocol](https://github.com/googleapis/googleapis/blob/master/google/devtools/remoteexecution/v1test/remote_execution.proto) that supports both remote caching and remote execution. As of this writing, there is only a single server-side implementation, which is not intended for production use. ### Bazel Setup diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java index 44a63c4d40..84fdb29430 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java @@ -14,15 +14,14 @@ package com.google.devtools.build.lib.remote; -import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputFileCache; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible; -import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; -import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; +import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.remoteexecution.v1test.ActionResult; +import com.google.devtools.remoteexecution.v1test.Digest; import java.io.IOException; import java.util.Collection; import javax.annotation.Nullable; @@ -32,8 +31,8 @@ import javax.annotation.Nullable; interface RemoteActionCache { // CAS API - // TODO(olaola): create a unified set of exceptions raised by the cache to encapsulate the - // underlying CasStatus messages and gRPC errors errors. + // TODO(buchgr): consider removing the CacheNotFoundException, and replacing it with other + // ways to signal a cache miss. /** * Upload enough of the tree metadata and data into remote cache so that the entire tree can be @@ -45,7 +44,7 @@ interface RemoteActionCache { /** * Download the entire tree data rooted by the given digest and write it into the given location. */ - void downloadTree(ContentDigest rootDigest, Path rootLocation) + void downloadTree(Digest rootDigest, Path rootLocation) throws IOException, CacheNotFoundException; /** @@ -68,7 +67,7 @@ interface RemoteActionCache { * * @return The key for fetching the file contents blob from cache. */ - ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException; + Digest uploadFileContents(Path file) throws IOException, InterruptedException; /** * Put the input file contents in cache if it is not already in it. No-op if the data is already @@ -76,22 +75,14 @@ interface RemoteActionCache { * * @return The key for fetching the file contents blob from cache. */ - ContentDigest uploadFileContents( - ActionInput input, Path execRoot, ActionInputFileCache inputCache) + Digest uploadFileContents(ActionInput input, Path execRoot, ActionInputFileCache inputCache) throws IOException, InterruptedException; - /** Upload the given blobs to the cache, and return their digests. */ - ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs) throws InterruptedException; - /** Upload the given blob to the cache, and return its digests. */ - ContentDigest uploadBlob(byte[] blob) throws InterruptedException; + Digest uploadBlob(byte[] blob) throws InterruptedException; /** Download and return a blob with a given digest from the cache. */ - byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException; - - /** Download and return blobs with given digests from the cache. */ - ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests) - throws CacheNotFoundException; + byte[] downloadBlob(Digest digest) throws CacheNotFoundException; // Execution Cache API diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java index fcdb44a14a..d779aa7073 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java @@ -72,30 +72,6 @@ public final class RemoteOptions extends OptionsBase { public String remoteCache; @Option( - name = "grpc_max_chunk_size_bytes", - defaultValue = "16000", - category = "remote", - help = "The maximal number of data bytes to be sent in a single message." - ) - public int grpcMaxChunkSizeBytes; - - @Option( - name = "grpc_max_batch_inputs", - defaultValue = "100", - category = "remote", - help = "The maximal number of input files to be sent in a single batch." - ) - public int grpcMaxBatchInputs; - - @Option( - name = "grpc_max_batch_size_bytes", - defaultValue = "10485760", // 10MB - category = "remote", - help = "The maximal number of input bytes to be sent in a single batch." - ) - public int grpcMaxBatchSizeBytes; - - @Option( name = "remote_timeout", defaultValue = "60", category = "remote", @@ -134,4 +110,12 @@ public final class RemoteOptions extends OptionsBase { help = "Temporary, for testing only. Manually set a Platform to pass to remote execution." ) public String experimentalRemotePlatformOverride; + + @Option( + name = "remote_instance_name", + defaultValue = "", + category = "remote", + help = "Value to pass as instance_name in the remote execution API." + ) + public String remoteInstanceName; } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java index bbb0166e4e..8807d0652b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java @@ -14,32 +14,28 @@ package com.google.devtools.build.lib.remote; -import static java.nio.charset.StandardCharsets.UTF_8; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.devtools.build.lib.actions.ActionExecutionMetadata; import com.google.devtools.build.lib.actions.ActionInput; +import com.google.devtools.build.lib.actions.ExecException; import com.google.devtools.build.lib.actions.Spawn; -import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; +import com.google.devtools.build.lib.actions.Spawns; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.exec.SpawnResult; import com.google.devtools.build.lib.exec.SpawnResult.Status; import com.google.devtools.build.lib.exec.SpawnRunner; -import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; -import com.google.devtools.build.lib.remote.RemoteProtocol.Action; -import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; -import com.google.devtools.build.lib.remote.RemoteProtocol.Command; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus; -import com.google.devtools.build.lib.remote.RemoteProtocol.Platform; +import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; +import com.google.devtools.remoteexecution.v1test.Action; +import com.google.devtools.remoteexecution.v1test.ActionResult; +import com.google.devtools.remoteexecution.v1test.Command; +import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.devtools.remoteexecution.v1test.ExecuteRequest; +import com.google.devtools.remoteexecution.v1test.Platform; +import com.google.protobuf.Duration; import com.google.protobuf.TextFormat; import com.google.protobuf.TextFormat.ParseException; import io.grpc.StatusRuntimeException; @@ -49,9 +45,7 @@ import java.util.List; import java.util.SortedMap; import java.util.TreeSet; -/** - * A client for the remote execution service. - */ +/** A client for the remote execution service. */ @ThreadSafe final class RemoteSpawnRunner implements SpawnRunner { private final Path execRoot; @@ -60,11 +54,13 @@ final class RemoteSpawnRunner implements SpawnRunner { private final Platform platform; private final GrpcRemoteExecutor executor; + private final GrpcActionCache remoteCache; RemoteSpawnRunner( Path execRoot, RemoteOptions options, - GrpcRemoteExecutor executor) { + GrpcRemoteExecutor executor, + GrpcActionCache remoteCache) { this.execRoot = execRoot; this.options = options; if (options.experimentalRemotePlatformOverride != null) { @@ -79,27 +75,12 @@ final class RemoteSpawnRunner implements SpawnRunner { platform = null; } this.executor = executor; - } - - RemoteSpawnRunner( - Path execRoot, - RemoteOptions options, - AuthAndTLSOptions authTlsOptions) { - this(execRoot, options, connect(options, authTlsOptions)); - } - - private static GrpcRemoteExecutor connect(RemoteOptions options, - AuthAndTLSOptions authTlsOptions) { - Preconditions.checkArgument(GrpcRemoteExecutor.isRemoteExecutionOptions(options)); - ChannelOptions channelOptions = ChannelOptions.create(authTlsOptions, - options.grpcMaxChunkSizeBytes); - return new GrpcRemoteExecutor( - RemoteUtils.createChannel(options.remoteExecutor, channelOptions), channelOptions, options); + this.remoteCache = remoteCache; } @Override public SpawnResult exec(Spawn spawn, SpawnExecutionPolicy policy) - throws InterruptedException, IOException { + throws ExecException, InterruptedException, IOException { ActionExecutionMetadata owner = spawn.getResourceOwner(); if (owner.getOwner() != null) { policy.report(ProgressStatus.EXECUTING); @@ -116,46 +97,38 @@ final class RemoteSpawnRunner implements SpawnRunner { Action action = buildAction( spawn.getOutputFiles(), - ContentDigests.computeDigest(command), - repository.getMerkleDigest(inputRoot)); + Digests.computeDigest(command), + repository.getMerkleDigest(inputRoot), + // TODO(olaola): set sensible local and remote timouts. + Spawns.getTimeoutSeconds(spawn, 120)); - ActionKey actionKey = ContentDigests.computeActionKey(action); + ActionKey actionKey = Digests.computeActionKey(action); ActionResult result = - this.options.remoteAcceptCached ? executor.getCachedActionResult(actionKey) : null; + options.remoteAcceptCached ? remoteCache.getCachedActionResult(actionKey) : null; if (result == null) { // Cache miss or we don't accept cache hits. // Upload the command and all the inputs into the remote cache. - executor.uploadBlob(command.toByteArray()); - // TODO(olaola): this should use the ActionInputFileCache for SHA1 digests! - executor.uploadTree(repository, execRoot, inputRoot); + remoteCache.uploadBlob(command.toByteArray()); + remoteCache.uploadTree(repository, execRoot, inputRoot); // TODO(olaola): set BuildInfo and input total bytes as well. ExecuteRequest.Builder request = ExecuteRequest.newBuilder() + .setInstanceName(options.remoteInstanceName) .setAction(action) - .setAcceptCached(this.options.remoteAcceptCached) + .setWaitForCompletion(true) .setTotalInputFileCount(inputMap.size()) - .setTimeoutMillis(policy.getTimeoutMillis()); - ExecuteReply reply = executor.executeRemotely(request.build()); - ExecutionStatus status = reply.getStatus(); - - if (!status.getSucceeded() - && (status.getError() != ExecutionStatus.ErrorCode.EXEC_FAILED)) { - return new SpawnResult.Builder() - // TODO(ulfjack): Improve the translation of the error status. - .setStatus(Status.EXECUTION_FAILED) - .setExitCode(-1) - .build(); - } - - result = reply.getResult(); + .setSkipCacheLookup(!options.remoteAcceptCached); + result = executor.executeRemotely(request.build()).getResult(); } // TODO(ulfjack): Download stdout, stderr, and the output files in a single call. - passRemoteOutErr(executor, result, policy.getFileOutErr()); - executor.downloadAllResults(result, execRoot); + passRemoteOutErr(remoteCache, result, policy.getFileOutErr()); + if (result.getExitCode() == 0) { + remoteCache.downloadAllResults(result, execRoot); + } return new SpawnResult.Builder() - .setStatus(Status.SUCCESS) - .setExitCode(result.getReturnCode()) + .setStatus(Status.SUCCESS) // Even if the action failed with non-zero exit code. + .setExitCode(result.getExitCode()) .build(); } catch (StatusRuntimeException | CacheNotFoundException e) { throw new IOException(e); @@ -163,37 +136,58 @@ final class RemoteSpawnRunner implements SpawnRunner { } private Action buildAction( - Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) { + Collection<? extends ActionInput> outputs, + Digest command, + Digest inputRoot, + long timeoutSeconds) { Action.Builder action = Action.newBuilder(); action.setCommandDigest(command); action.setInputRootDigest(inputRoot); // Somewhat ugly: we rely on the stable order of outputs here for remote action caching. for (ActionInput output : outputs) { - action.addOutputPath(output.getExecPathString()); + // TODO: output directories should be handled here, when they are supported. + action.addOutputFiles(output.getExecPathString()); } if (platform != null) { action.setPlatform(platform); } + action.setTimeout(Duration.newBuilder().setSeconds(timeoutSeconds)); return action.build(); } private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) { Command.Builder command = Command.newBuilder(); - command.addAllArgv(arguments); + command.addAllArguments(arguments); // Sorting the environment pairs by variable name. TreeSet<String> variables = new TreeSet<>(environment.keySet()); for (String var : variables) { - command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var)); + command.addEnvironmentVariablesBuilder().setName(var).setValue(environment.get(var)); } return command.build(); } private static void passRemoteOutErr( - RemoteActionCache cache, ActionResult result, FileOutErr outErr) - throws CacheNotFoundException { - ImmutableList<byte[]> streams = - cache.downloadBlobs(ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest())); - outErr.printOut(new String(streams.get(0), UTF_8)); - outErr.printErr(new String(streams.get(1), UTF_8)); + RemoteActionCache cache, ActionResult result, FileOutErr outErr) throws IOException { + try { + if (!result.getStdoutRaw().isEmpty()) { + result.getStdoutRaw().writeTo(outErr.getOutputStream()); + outErr.getOutputStream().flush(); + } else if (result.hasStdoutDigest()) { + byte[] stdoutBytes = cache.downloadBlob(result.getStdoutDigest()); + outErr.getOutputStream().write(stdoutBytes); + outErr.getOutputStream().flush(); + } + if (!result.getStderrRaw().isEmpty()) { + result.getStderrRaw().writeTo(outErr.getErrorStream()); + outErr.getErrorStream().flush(); + } else if (result.hasStderrDigest()) { + byte[] stderrBytes = cache.downloadBlob(result.getStderrDigest()); + outErr.getErrorStream().write(stderrBytes); + outErr.getErrorStream().flush(); + } + } catch (CacheNotFoundException e) { + outErr.printOutLn("Failed to fetch remote stdout/err due to cache miss."); + outErr.getOutputStream().flush(); + } } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java index f3e12f07ee..3f7698252e 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java @@ -15,7 +15,6 @@ package com.google.devtools.build.lib.remote; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.devtools.build.lib.actions.ActionExecutionContext; import com.google.devtools.build.lib.actions.ActionInput; @@ -32,15 +31,7 @@ import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.EventHandler; import com.google.devtools.build.lib.exec.SpawnInputExpander; -import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; -import com.google.devtools.build.lib.remote.RemoteProtocol.Action; -import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; -import com.google.devtools.build.lib.remote.RemoteProtocol.Command; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus; -import com.google.devtools.build.lib.remote.RemoteProtocol.Platform; +import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.rules.fileset.FilesetActionContext; import com.google.devtools.build.lib.standalone.StandaloneSpawnStrategy; @@ -48,6 +39,14 @@ import com.google.devtools.build.lib.util.CommandFailureUtils; import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; +import com.google.devtools.remoteexecution.v1test.Action; +import com.google.devtools.remoteexecution.v1test.ActionResult; +import com.google.devtools.remoteexecution.v1test.Command; +import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.devtools.remoteexecution.v1test.ExecuteRequest; +import com.google.devtools.remoteexecution.v1test.ExecuteResponse; +import com.google.devtools.remoteexecution.v1test.Platform; +import com.google.protobuf.Duration; import com.google.protobuf.TextFormat; import com.google.protobuf.TextFormat.ParseException; import io.grpc.StatusRuntimeException; @@ -88,12 +87,12 @@ final class RemoteSpawnStrategy implements SpawnActionContext { this.standaloneStrategy = new StandaloneSpawnStrategy(execRoot, verboseFailures, productName); this.verboseFailures = verboseFailures; this.remoteOptions = remoteOptions; - channelOptions = ChannelOptions.create(authTlsOptions, remoteOptions.grpcMaxChunkSizeBytes); + channelOptions = ChannelOptions.create(authTlsOptions); if (remoteOptions.experimentalRemotePlatformOverride != null) { Platform.Builder platformBuilder = Platform.newBuilder(); try { - TextFormat.getParser().merge(remoteOptions.experimentalRemotePlatformOverride, - platformBuilder); + TextFormat.getParser() + .merge(remoteOptions.experimentalRemotePlatformOverride, platformBuilder); } catch (ParseException e) { throw new IllegalArgumentException( "Failed to parse --experimental_remote_platform_override", e); @@ -105,27 +104,32 @@ final class RemoteSpawnStrategy implements SpawnActionContext { } private Action buildAction( - Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) { + Collection<? extends ActionInput> outputs, + Digest command, + Digest inputRoot, + long timeoutSeconds) { Action.Builder action = Action.newBuilder(); action.setCommandDigest(command); action.setInputRootDigest(inputRoot); // Somewhat ugly: we rely on the stable order of outputs here for remote action caching. for (ActionInput output : outputs) { - action.addOutputPath(output.getExecPathString()); + // TODO: output directories should be handled here, when they are supported. + action.addOutputFiles(output.getExecPathString()); } if (platform != null) { action.setPlatform(platform); } + action.setTimeout(Duration.newBuilder().setSeconds(timeoutSeconds)); return action.build(); } private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) { Command.Builder command = Command.newBuilder(); - command.addAllArgv(arguments); + command.addAllArguments(arguments); // Sorting the environment pairs by variable name. TreeSet<String> variables = new TreeSet<>(environment.keySet()); for (String var : variables) { - command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var)); + command.addEnvironmentVariablesBuilder().setName(var).setValue(environment.get(var)); } return command.build(); } @@ -137,11 +141,11 @@ final class RemoteSpawnStrategy implements SpawnActionContext { private void execLocally( Spawn spawn, ActionExecutionContext actionExecutionContext, - RemoteActionCache actionCache, + RemoteActionCache remoteCache, ActionKey actionKey) throws ExecException, InterruptedException { standaloneStrategy.exec(spawn, actionExecutionContext); - if (remoteOptions.remoteUploadLocalResults && actionCache != null && actionKey != null) { + if (remoteOptions.remoteUploadLocalResults && remoteCache != null && actionKey != null) { ArrayList<Path> outputFiles = new ArrayList<>(); for (ActionInput output : spawn.getOutputFiles()) { Path outputFile = execRoot.getRelative(output.getExecPathString()); @@ -155,17 +159,17 @@ final class RemoteSpawnStrategy implements SpawnActionContext { } try { ActionResult.Builder result = ActionResult.newBuilder(); - actionCache.uploadAllResults(execRoot, outputFiles, result); + remoteCache.uploadAllResults(execRoot, outputFiles, result); FileOutErr outErr = actionExecutionContext.getFileOutErr(); if (outErr.getErrorPath().exists()) { - ContentDigest stderr = actionCache.uploadFileContents(outErr.getErrorPath()); + Digest stderr = remoteCache.uploadFileContents(outErr.getErrorPath()); result.setStderrDigest(stderr); } if (outErr.getOutputPath().exists()) { - ContentDigest stdout = actionCache.uploadFileContents(outErr.getOutputPath()); + Digest stdout = remoteCache.uploadFileContents(outErr.getOutputPath()); result.setStdoutDigest(stdout); } - actionCache.setCachedActionResult(actionKey, result.build()); + remoteCache.setCachedActionResult(actionKey, result.build()); // Handle all cache errors here. } catch (IOException e) { throw new UserExecException("Unexpected IO error.", e); @@ -188,21 +192,25 @@ final class RemoteSpawnStrategy implements SpawnActionContext { private static void passRemoteOutErr( RemoteActionCache cache, ActionResult result, FileOutErr outErr) throws IOException { try { - ImmutableList<byte[]> streams = - cache.downloadBlobs(ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest())); - byte[] stdout = streams.get(0); - byte[] stderr = streams.get(1); - // Don't round-trip through String - write to the output streams directly. - if (stdout.length != 0) { - outErr.getOutputStream().write(stdout); + if (!result.getStdoutRaw().isEmpty()) { + result.getStdoutRaw().writeTo(outErr.getOutputStream()); + outErr.getOutputStream().flush(); + } else if (result.hasStdoutDigest()) { + byte[] stdoutBytes = cache.downloadBlob(result.getStdoutDigest()); + outErr.getOutputStream().write(stdoutBytes); outErr.getOutputStream().flush(); } - if (stderr.length != 0) { - outErr.getErrorStream().write(stderr); + if (!result.getStderrRaw().isEmpty()) { + result.getStderrRaw().writeTo(outErr.getErrorStream()); + outErr.getErrorStream().flush(); + } else if (result.hasStderrDigest()) { + byte[] stderrBytes = cache.downloadBlob(result.getStderrDigest()); + outErr.getErrorStream().write(stderrBytes); outErr.getErrorStream().flush(); } } catch (CacheNotFoundException e) { - // Ignoring. + outErr.printOutLn("Failed to fetch remote stdout/err due to cache miss."); + outErr.getOutputStream().flush(); } } @@ -220,19 +228,23 @@ final class RemoteSpawnStrategy implements SpawnActionContext { Executor executor = actionExecutionContext.getExecutor(); EventHandler eventHandler = executor.getEventHandler(); - RemoteActionCache actionCache = null; + RemoteActionCache remoteCache = null; GrpcRemoteExecutor workExecutor = null; if (spawn.isRemotable()) { // Initialize remote cache and execution handlers. We use separate handlers for every // action to enable server-side parallelism (need a different gRPC channel per action). if (SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions)) { - actionCache = new SimpleBlobStoreActionCache(SimpleBlobStoreFactory.create(remoteOptions)); + remoteCache = new SimpleBlobStoreActionCache(SimpleBlobStoreFactory.create(remoteOptions)); } else if (GrpcActionCache.isRemoteCacheOptions(remoteOptions)) { - actionCache = new GrpcActionCache(remoteOptions, channelOptions); + remoteCache = + new GrpcActionCache( + RemoteUtils.createChannel(remoteOptions.remoteCache, channelOptions), + channelOptions, + remoteOptions); } - // Otherwise actionCache remains null and remote caching/execution are disabled. + // Otherwise remoteCache remains null and remote caching/execution are disabled. - if (actionCache != null && GrpcRemoteExecutor.isRemoteExecutionOptions(remoteOptions)) { + if (remoteCache != null && GrpcRemoteExecutor.isRemoteExecutionOptions(remoteOptions)) { workExecutor = new GrpcRemoteExecutor( RemoteUtils.createChannel(remoteOptions.remoteExecutor, channelOptions), @@ -240,15 +252,16 @@ final class RemoteSpawnStrategy implements SpawnActionContext { remoteOptions); } } - if (!spawn.isRemotable() || actionCache == null) { + if (!spawn.isRemotable() || remoteCache == null) { standaloneStrategy.exec(spawn, actionExecutionContext); return; } if (executor.reportsSubcommands()) { executor.reportSubcommand(spawn); } - executor.getEventBus().post( - ActionStatusMessage.runningStrategy(spawn.getResourceOwner(), "remote")); + executor + .getEventBus() + .post(ActionStatusMessage.runningStrategy(spawn.getResourceOwner(), "remote")); try { // Temporary hack: the TreeNodeRepository should be created and maintained upstream! @@ -266,22 +279,25 @@ final class RemoteSpawnStrategy implements SpawnActionContext { Action action = buildAction( spawn.getOutputFiles(), - ContentDigests.computeDigest(command), - repository.getMerkleDigest(inputRoot)); + Digests.computeDigest(command), + repository.getMerkleDigest(inputRoot), + // TODO(olaola): set sensible local and remote timouts. + Spawns.getTimeoutSeconds(spawn, 120)); // Look up action cache, and reuse the action output if it is found. - actionKey = ContentDigests.computeActionKey(action); - ActionResult result = this.remoteOptions.remoteAcceptCached - ? actionCache.getCachedActionResult(actionKey) - : null; + actionKey = Digests.computeActionKey(action); + ActionResult result = + this.remoteOptions.remoteAcceptCached + ? remoteCache.getCachedActionResult(actionKey) + : null; boolean acceptCachedResult = this.remoteOptions.remoteAcceptCached; if (result != null) { // We don't cache failed actions, so we know the outputs exist. // For now, download all outputs locally; in the future, we can reuse the digests to // just update the TreeNodeRepository and continue the build. try { - actionCache.downloadAllResults(result, execRoot); - passRemoteOutErr(actionCache, result, actionExecutionContext.getFileOutErr()); + remoteCache.downloadAllResults(result, execRoot); + passRemoteOutErr(remoteCache, result, actionExecutionContext.getFileOutErr()); return; } catch (CacheNotFoundException e) { acceptCachedResult = false; // Retry the action remotely and invalidate the results. @@ -289,47 +305,39 @@ final class RemoteSpawnStrategy implements SpawnActionContext { } if (workExecutor == null) { - execLocally(spawn, actionExecutionContext, actionCache, actionKey); + execLocally(spawn, actionExecutionContext, remoteCache, actionKey); return; } // Upload the command and all the inputs into the remote cache. - actionCache.uploadBlob(command.toByteArray()); - // TODO(olaola): this should use the ActionInputFileCache for SHA1 digests! - actionCache.uploadTree(repository, execRoot, inputRoot); + remoteCache.uploadBlob(command.toByteArray()); + remoteCache.uploadTree(repository, execRoot, inputRoot); // TODO(olaola): set BuildInfo and input total bytes as well. ExecuteRequest.Builder request = ExecuteRequest.newBuilder() + .setInstanceName(remoteOptions.remoteInstanceName) .setAction(action) - .setAcceptCached(acceptCachedResult) + .setWaitForCompletion(true) .setTotalInputFileCount(inputMap.size()) - .setTimeoutMillis(1000 * Spawns.getTimeoutSeconds(spawn, 120)); - // TODO(olaola): set sensible local and remote timouts. - ExecuteReply reply = workExecutor.executeRemotely(request.build()); - ExecutionStatus status = reply.getStatus(); + .setSkipCacheLookup(!acceptCachedResult); + ExecuteResponse reply = workExecutor.executeRemotely(request.build()); result = reply.getResult(); - // We do not want to pass on the remote stdout and strerr if we are going to retry the - // action. - if (status.getSucceeded()) { - passRemoteOutErr(actionCache, result, actionExecutionContext.getFileOutErr()); - actionCache.downloadAllResults(result, execRoot); - if (result.getReturnCode() != 0) { - String cwd = executor.getExecRoot().getPathString(); - String message = - CommandFailureUtils.describeCommandFailure( - verboseFailures, spawn.getArguments(), spawn.getEnvironment(), cwd); - throw new UserExecException(message + ": Exit " + result.getReturnCode()); - } + if (remoteOptions.remoteLocalFallback && result.getExitCode() != 0) { + execLocally(spawn, actionExecutionContext, remoteCache, actionKey); return; } - if (status.getError() == ExecutionStatus.ErrorCode.EXEC_FAILED - || !remoteOptions.remoteLocalFallback) { - passRemoteOutErr(actionCache, result, actionExecutionContext.getFileOutErr()); - throw new UserExecException(status.getErrorDetail()); + passRemoteOutErr(remoteCache, result, actionExecutionContext.getFileOutErr()); + if (result.getExitCode() == 0) { + remoteCache.downloadAllResults(result, execRoot); + } else { + String cwd = executor.getExecRoot().getPathString(); + String message = + CommandFailureUtils.describeCommandFailure( + verboseFailures, spawn.getArguments(), spawn.getEnvironment(), cwd); + throw new UserExecException(message + ": Exit " + result.getExitCode()); } // For now, we retry locally on all other remote errors. // TODO(olaola): add remote retries on cache miss errors. - execLocally(spawn, actionExecutionContext, actionCache, actionKey); } catch (IOException e) { throw new UserExecException("Unexpected IO error.", e); } catch (InterruptedException e) { @@ -343,14 +351,15 @@ final class RemoteSpawnStrategy implements SpawnActionContext { } eventHandler.handle(Event.warn(mnemonic + " remote work failed (" + e + ")" + stackTrace)); if (remoteOptions.remoteLocalFallback) { - execLocally(spawn, actionExecutionContext, actionCache, actionKey); + execLocally(spawn, actionExecutionContext, remoteCache, actionKey); } else { throw new UserExecException(e); } } catch (CacheNotFoundException e) { + // TODO(olaola): handle this exception by reuploading / reexecuting the action remotely. eventHandler.handle(Event.warn(mnemonic + " remote work results cache miss (" + e + ")")); if (remoteOptions.remoteLocalFallback) { - execLocally(spawn, actionExecutionContext, actionCache, actionKey); + execLocally(spawn, actionExecutionContext, remoteCache, actionKey); } else { throw new UserExecException(e); } diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java index 845f5411fe..58b2c94bf8 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java @@ -26,8 +26,7 @@ public final class RemoteUtils { NettyChannelBuilder builder = NettyChannelBuilder.forTarget(target) .negotiationType( - channelOptions.tlsEnabled() ? NegotiationType.TLS : NegotiationType.PLAINTEXT) - .maxMessageSize(channelOptions.maxMessageSize()); + channelOptions.tlsEnabled() ? NegotiationType.TLS : NegotiationType.PLAINTEXT); if (channelOptions.getSslContext() != null) { builder.sslContext(channelOptions.getSslContext()); if (channelOptions.getTlsAuthorityOverride() != null) { diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java index bb1ca512d5..75d208a687 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java +++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java @@ -14,28 +14,27 @@ package com.google.devtools.build.lib.remote; -import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputFileCache; import com.google.devtools.build.lib.actions.cache.VirtualActionInput; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; -import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; -import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; -import com.google.devtools.build.lib.remote.RemoteProtocol.FileMetadata; -import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode; -import com.google.devtools.build.lib.remote.RemoteProtocol.Output; -import com.google.devtools.build.lib.remote.RemoteProtocol.Output.ContentCase; +import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.remoteexecution.v1test.ActionResult; +import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.devtools.remoteexecution.v1test.Directory; +import com.google.devtools.remoteexecution.v1test.DirectoryNode; +import com.google.devtools.remoteexecution.v1test.FileNode; +import com.google.devtools.remoteexecution.v1test.OutputDirectory; +import com.google.devtools.remoteexecution.v1test.OutputFile; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.Semaphore; @@ -59,8 +58,8 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { public void uploadTree(TreeNodeRepository repository, Path execRoot, TreeNode root) throws IOException, InterruptedException { repository.computeMerkleDigests(root); - for (FileNode fileNode : repository.treeToFileNodes(root)) { - uploadBlob(fileNode.toByteArray()); + for (Directory directory : repository.treeToDirectories(root)) { + uploadBlob(directory.toByteArray()); } // TODO(ulfjack): Only upload files that aren't in the CAS yet? for (TreeNode leaf : repository.leaves(root)) { @@ -69,26 +68,26 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { } @Override - public void downloadTree(ContentDigest rootDigest, Path rootLocation) + public void downloadTree(Digest rootDigest, Path rootLocation) throws IOException, CacheNotFoundException { - FileNode fileNode = FileNode.parseFrom(downloadBlob(rootDigest)); - if (fileNode.hasFileMetadata()) { - FileMetadata meta = fileNode.getFileMetadata(); - downloadFileContents(meta.getDigest(), rootLocation, meta.getExecutable()); + Directory directory = Directory.parseFrom(downloadBlob(rootDigest)); + for (FileNode file : directory.getFilesList()) { + downloadFileContents( + file.getDigest(), rootLocation.getRelative(file.getName()), file.getIsExecutable()); } - for (FileNode.Child child : fileNode.getChildList()) { - downloadTree(child.getDigest(), rootLocation.getRelative(child.getPath())); + for (DirectoryNode child : directory.getDirectoriesList()) { + downloadTree(child.getDigest(), rootLocation.getRelative(child.getName())); } } @Override - public ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException { + public Digest uploadFileContents(Path file) throws IOException, InterruptedException { // This unconditionally reads the whole file into memory first! return uploadBlob(ByteString.readFrom(file.getInputStream()).toByteArray()); } @Override - public ContentDigest uploadFileContents( + public Digest uploadFileContents( ActionInput input, Path execRoot, ActionInputFileCache inputCache) throws IOException, InterruptedException { // This unconditionally reads the whole file into memory first! @@ -96,26 +95,31 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); ((VirtualActionInput) input).writeTo(buffer); byte[] blob = buffer.toByteArray(); - return uploadBlob(blob, ContentDigests.computeDigest(blob)); + return uploadBlob(blob, Digests.computeDigest(blob)); } return uploadBlob( ByteString.readFrom(execRoot.getRelative(input.getExecPathString()).getInputStream()) .toByteArray(), - ContentDigests.getDigestFromInputCache(input, inputCache)); + Digests.getDigestFromInputCache(input, inputCache)); } @Override public void downloadAllResults(ActionResult result, Path execRoot) throws IOException, CacheNotFoundException { - for (Output output : result.getOutputList()) { - if (output.getContentCase() == ContentCase.FILE_METADATA) { - FileMetadata m = output.getFileMetadata(); - downloadFileContents( - m.getDigest(), execRoot.getRelative(output.getPath()), m.getExecutable()); + for (OutputFile file : result.getOutputFilesList()) { + if (!file.getContent().isEmpty()) { + createFile( + file.getContent().toByteArray(), + execRoot.getRelative(file.getPath()), + file.getIsExecutable()); } else { - downloadTree(output.getDigest(), execRoot.getRelative(output.getPath())); + downloadFileContents( + file.getDigest(), execRoot.getRelative(file.getPath()), file.getIsExecutable()); } } + for (OutputDirectory directory : result.getOutputDirectoriesList()) { + downloadTree(directory.getDigest(), execRoot.getRelative(directory.getPath())); + } } @Override @@ -130,22 +134,25 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { // TreeNodeRepository to call uploadTree. throw new UnsupportedOperationException("Storing a directory is not yet supported."); } + // TODO(olaola): inline small file contents here. // First put the file content to cache. - ContentDigest digest = uploadFileContents(file); + Digest digest = uploadFileContents(file); // Add to protobuf. result - .addOutputBuilder() + .addOutputFilesBuilder() .setPath(file.relativeTo(execRoot).getPathString()) - .getFileMetadataBuilder() .setDigest(digest) - .setExecutable(file.isExecutable()); + .setIsExecutable(file.isExecutable()); } } - private void downloadFileContents(ContentDigest digest, Path dest, boolean executable) + private void downloadFileContents(Digest digest, Path dest, boolean executable) throws IOException, CacheNotFoundException { // This unconditionally downloads the whole file into memory first! - byte[] contents = downloadBlob(digest); + createFile(downloadBlob(digest), dest, executable); + } + + private void createFile(byte[] contents, Path dest, boolean executable) throws IOException { FileSystemUtils.createDirectoryAndParents(dest.getParentDirectory()); try (OutputStream stream = dest.getOutputStream()) { stream.write(contents); @@ -153,16 +160,6 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { dest.setExecutable(executable); } - @Override - public ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs) - throws InterruptedException { - ArrayList<ContentDigest> digests = new ArrayList<>(); - for (byte[] blob : blobs) { - digests.add(uploadBlob(blob)); - } - return ImmutableList.copyOf(digests); - } - private void checkBlobSize(long blobSizeKBytes, String type) { Preconditions.checkArgument( blobSizeKBytes < MAX_MEMORY_KBYTES, @@ -172,16 +169,16 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { } @Override - public ContentDigest uploadBlob(byte[] blob) throws InterruptedException { - return uploadBlob(blob, ContentDigests.computeDigest(blob)); + public Digest uploadBlob(byte[] blob) throws InterruptedException { + return uploadBlob(blob, Digests.computeDigest(blob)); } - private ContentDigest uploadBlob(byte[] blob, ContentDigest digest) throws InterruptedException { + private Digest uploadBlob(byte[] blob, Digest digest) throws InterruptedException { int blobSizeKBytes = blob.length / 1024; checkBlobSize(blobSizeKBytes, "Upload"); uploadMemoryAvailable.acquire(blobSizeKBytes); try { - blobStore.put(ContentDigests.toHexString(digest), blob); + blobStore.put(digest.getHash(), blob); } finally { uploadMemoryAvailable.release(blobSizeKBytes); } @@ -189,36 +186,26 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { } @Override - public byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException { + public byte[] downloadBlob(Digest digest) throws CacheNotFoundException { if (digest.getSizeBytes() == 0) { return new byte[0]; } // This unconditionally downloads the whole blob into memory! checkBlobSize(digest.getSizeBytes() / 1024, "Download"); - byte[] data = blobStore.get(ContentDigests.toHexString(digest)); + byte[] data = blobStore.get(digest.getHash()); if (data == null) { throw new CacheNotFoundException(digest); } return data; } - @Override - public ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests) - throws CacheNotFoundException { - ArrayList<byte[]> blobs = new ArrayList<>(); - for (ContentDigest c : digests) { - blobs.add(downloadBlob(c)); - } - return ImmutableList.copyOf(blobs); - } - - public boolean containsKey(ContentDigest digest) { - return blobStore.containsKey(ContentDigests.toHexString(digest)); + public boolean containsKey(Digest digest) { + return blobStore.containsKey(digest.getHash()); } @Override public ActionResult getCachedActionResult(ActionKey actionKey) { - byte[] data = blobStore.get(ContentDigests.toHexString(actionKey.getDigest())); + byte[] data = blobStore.get(actionKey.getDigest().getHash()); if (data == null) { return null; } @@ -232,6 +219,6 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache { @Override public void setCachedActionResult(ActionKey actionKey, ActionResult result) throws InterruptedException { - blobStore.put(ContentDigests.toHexString(actionKey.getDigest()), result.toByteArray()); + blobStore.put(actionKey.getDigest().getHash(), result.toByteArray()); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java b/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java index 8272295a84..dba3a14c12 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java +++ b/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java @@ -29,11 +29,11 @@ import com.google.devtools.build.lib.concurrent.BlazeInterners; import com.google.devtools.build.lib.concurrent.ThreadSafety.Immutable; import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe; import com.google.devtools.build.lib.exec.SpawnInputExpander; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; -import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode; import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; +import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.devtools.remoteexecution.v1test.Directory; import com.google.protobuf.ByteString; import java.io.IOException; import java.util.ArrayList; @@ -185,11 +185,11 @@ public final class TreeNodeRepository extends TreeTraverser<TreeNodeRepository.T // be part of the state. private final Path execRoot; private final ActionInputFileCache inputFileCache; - private final Map<TreeNode, ContentDigest> treeNodeDigestCache = new HashMap<>(); - private final Map<ContentDigest, TreeNode> digestTreeNodeCache = new HashMap<>(); - private final Map<TreeNode, FileNode> fileNodeCache = new HashMap<>(); - private final Map<VirtualActionInput, ContentDigest> virtualInputDigestCache = new HashMap<>(); - private final Map<ContentDigest, VirtualActionInput> digestVirtualInputCache = new HashMap<>(); + private final Map<TreeNode, Digest> treeNodeDigestCache = new HashMap<>(); + private final Map<Digest, TreeNode> digestTreeNodeCache = new HashMap<>(); + private final Map<TreeNode, Directory> directoryCache = new HashMap<>(); + private final Map<VirtualActionInput, Digest> virtualInputDigestCache = new HashMap<>(); + private final Map<Digest, VirtualActionInput> digestVirtualInputCache = new HashMap<>(); public TreeNodeRepository(Path execRoot, ActionInputFileCache inputFileCache) { this.execRoot = execRoot; @@ -298,125 +298,125 @@ public final class TreeNodeRepository extends TreeTraverser<TreeNodeRepository.T return interner.intern(new TreeNode(entries)); } - private synchronized FileNode getOrComputeFileNode(TreeNode node) throws IOException { + private synchronized Directory getOrComputeDirectory(TreeNode node) throws IOException { // Assumes all child digests have already been computed! - FileNode fileNode = fileNodeCache.get(node); - if (fileNode == null) { - FileNode.Builder b = FileNode.newBuilder(); - if (node.isLeaf()) { - ActionInput input = node.getActionInput(); - if (input instanceof VirtualActionInput) { - VirtualActionInput virtualInput = (VirtualActionInput) input; - ContentDigest digest = ContentDigests.computeDigest(virtualInput); - virtualInputDigestCache.put(virtualInput, digest); - // There may be multiple inputs with the same digest. In that case, we don't care which - // one we get back from the digestVirtualInputCache later. - digestVirtualInputCache.put(digest, virtualInput); - b.getFileMetadataBuilder() - .setDigest(digest) - // We always declare virtual action inputs as non-executable for now. - .setExecutable(false); + Preconditions.checkArgument(!node.isLeaf()); + Directory directory = directoryCache.get(node); + if (directory == null) { + Directory.Builder b = Directory.newBuilder(); + for (TreeNode.ChildEntry entry : node.getChildEntries()) { + TreeNode child = entry.getChild(); + if (child.isLeaf()) { + ActionInput input = child.getActionInput(); + if (input instanceof VirtualActionInput) { + VirtualActionInput virtualInput = (VirtualActionInput) input; + Digest digest = Digests.computeDigest(virtualInput); + virtualInputDigestCache.put(virtualInput, digest); + // There may be multiple inputs with the same digest. In that case, we don't care which + // one we get back from the digestVirtualInputCache later. + digestVirtualInputCache.put(digest, virtualInput); + b.addFilesBuilder() + .setName(entry.getSegment()) + .setDigest(digest) + .setIsExecutable(false); + } else { + b.addFilesBuilder() + .setName(entry.getSegment()) + .setDigest(Digests.getDigestFromInputCache(input, inputFileCache)) + .setIsExecutable(execRoot.getRelative(input.getExecPathString()).isExecutable()); + } } else { - b.getFileMetadataBuilder() - .setDigest(ContentDigests.getDigestFromInputCache(input, inputFileCache)) - .setExecutable(execRoot.getRelative(input.getExecPathString()).isExecutable()); - } - } else { - for (TreeNode.ChildEntry entry : node.getChildEntries()) { - ContentDigest childDigest = treeNodeDigestCache.get(entry.getChild()); - Preconditions.checkState(childDigest != null); - b.addChildBuilder().setPath(entry.getSegment()).setDigest(childDigest); + Digest childDigest = Preconditions.checkNotNull(treeNodeDigestCache.get(child)); + b.addDirectoriesBuilder().setName(entry.getSegment()).setDigest(childDigest); } } - fileNode = b.build(); - fileNodeCache.put(node, fileNode); - ContentDigest digest = ContentDigests.computeDigest(fileNode); + directory = b.build(); + directoryCache.put(node, directory); + Digest digest = Digests.computeDigest(directory); treeNodeDigestCache.put(node, digest); digestTreeNodeCache.put(digest, node); } - return fileNode; + return directory; } // Recursively traverses the tree, expanding and computing Merkle digests for nodes for which // they have not yet been computed and cached. public void computeMerkleDigests(TreeNode root) throws IOException { synchronized (this) { - if (fileNodeCache.get(root) != null) { + if (directoryCache.get(root) != null) { // Strong assumption: the cache is valid, i.e. parent present implies children present. return; } } - if (root.isLeaf()) { - ActionInput input = root.getActionInput(); - if (!(input instanceof VirtualActionInput)) { - inputFileCache.getDigest(input); - } - } else { + if (!root.isLeaf()) { for (TreeNode child : children(root)) { computeMerkleDigests(child); } + getOrComputeDirectory(root); } - getOrComputeFileNode(root); } /** * Should only be used after computeMerkleDigests has been called on one of the node ancestors. * Returns the precomputed digest. */ - public ContentDigest getMerkleDigest(TreeNode node) { - return treeNodeDigestCache.get(node); + public Digest getMerkleDigest(TreeNode node) throws IOException { + return node.isLeaf() + ? actionInputToDigest(node.getActionInput()) + : treeNodeDigestCache.get(node); } /** * Returns the precomputed digests for both data and metadata. Should only be used after * computeMerkleDigests has been called on one of the node ancestors. */ - public ImmutableCollection<ContentDigest> getAllDigests(TreeNode root) throws IOException { - ImmutableSet.Builder<ContentDigest> digests = ImmutableSet.builder(); + public ImmutableCollection<Digest> getAllDigests(TreeNode root) throws IOException { + ImmutableSet.Builder<Digest> digests = ImmutableSet.builder(); for (TreeNode node : descendants(root)) { - digests.add(Preconditions.checkNotNull(treeNodeDigestCache.get(node))); - if (node.isLeaf()) { - digests.add(actionInputToDigest(node.getActionInput())); - } + digests.add( + node.isLeaf() + ? actionInputToDigest(node.getActionInput()) + : Preconditions.checkNotNull(treeNodeDigestCache.get(node))); } return digests.build(); } - private ContentDigest actionInputToDigest(ActionInput input) throws IOException { + private Digest actionInputToDigest(ActionInput input) throws IOException { if (input instanceof VirtualActionInput) { return Preconditions.checkNotNull(virtualInputDigestCache.get(input)); } - return ContentDigests.getDigestFromInputCache(input, inputFileCache); + return Digests.getDigestFromInputCache(input, inputFileCache); } /** - * Serializes all of the subtree to the file node list. TODO(olaola): add a version that only - * copies a part of the tree that we are interested in. Should only be used after - * computeMerkleDigests has been called on one of the node ancestors. + * Serializes all of the subtree to a Directory list. TODO(olaola): add a version that only copies + * a part of the tree that we are interested in. Should only be used after computeMerkleDigests + * has been called on one of the node ancestors. */ // Note: this is not, strictly speaking, thread safe. If someone is deleting cached Merkle hashes // while this is executing, it will trigger an exception. But I think this is WAI. - public ImmutableList<FileNode> treeToFileNodes(TreeNode root) { - ImmutableList.Builder<FileNode> fileNodes = ImmutableList.builder(); + public ImmutableList<Directory> treeToDirectories(TreeNode root) { + ImmutableList.Builder<Directory> directories = ImmutableList.builder(); for (TreeNode node : descendants(root)) { - fileNodes.add(Preconditions.checkNotNull(fileNodeCache.get(node))); + if (!node.isLeaf()) { + directories.add(Preconditions.checkNotNull(directoryCache.get(node))); + } } - return fileNodes.build(); + return directories.build(); } /** * Should only be used on digests created by a call to computeMerkleDigests. Looks up ActionInputs - * or FileNodes by cached digests and adds them to the lists. + * or Directory messages by cached digests and adds them to the lists. */ public void getDataFromDigests( - Iterable<ContentDigest> digests, List<ActionInput> actionInputs, List<FileNode> nodes) { - for (ContentDigest digest : digests) { + Iterable<Digest> digests, List<ActionInput> actionInputs, List<Directory> nodes) { + for (Digest digest : digests) { TreeNode treeNode = digestTreeNodeCache.get(digest); if (treeNode != null) { - nodes.add(Preconditions.checkNotNull(fileNodeCache.get(treeNode))); - } else { - // If not there, it must be an ActionInput. - ByteString hexDigest = ByteString.copyFromUtf8(ContentDigests.toHexString(digest)); + nodes.add(Preconditions.checkNotNull(directoryCache.get(treeNode))); + } else { // If not there, it must be an ActionInput. + ByteString hexDigest = ByteString.copyFromUtf8(digest.getHash()); ActionInput input = inputFileCache.getInputFromDigest(hexDigest); if (input == null) { // ... or a VirtualActionInput. diff --git a/src/main/protobuf/BUILD b/src/main/protobuf/BUILD index 96a9394a04..d08393d17e 100644 --- a/src/main/protobuf/BUILD +++ b/src/main/protobuf/BUILD @@ -22,7 +22,6 @@ FILES = [ "android_deploy_info", "apk_manifest", "command_server", - "remote_protocol", ] [proto_library( @@ -59,13 +58,6 @@ cc_grpc_library( src = "command_server.proto", ) -# TODO(olaola): add Golang support. -java_grpc_library( - name = "remote_protocol_java_grpc", - srcs = [":remote_protocol_proto"], - deps = [":remote_protocol_java_proto"], -) - py_proto_library( name = "build_pb_py", srcs = ["build.proto"], @@ -82,6 +74,5 @@ filegroup( name = "dist_jars", srcs = [s + "_java_proto_srcs" for s in FILES] + [ ":command_server_java_grpc_srcs", - ":remote_protocol_java_grpc_srcs", ], ) diff --git a/src/main/protobuf/remote_protocol.proto b/src/main/protobuf/remote_protocol.proto deleted file mode 100644 index 2a68643d1e..0000000000 --- a/src/main/protobuf/remote_protocol.proto +++ /dev/null @@ -1,329 +0,0 @@ -// Copyright 2015 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -syntax = "proto3"; - -package google.devtools.remote_execution; - -option java_package = "com.google.devtools.build.lib.remote"; - -message ContentDigest { - bytes digest = 1; // A hash of the contents of the file, blob, or tree node - // (see below). The contents are digested using SHA1 (20 bytes). - int64 size_bytes = 2; - int32 version = 3; // Will be 0 by default, and increase if we ever change - // the hash function used, etc. - // And maybe other metadata later. -} - -message FileMetadata { - ContentDigest digest = 1; - bool executable = 3; -} - -// A Platform is defined by a set of opaque name-value pairs. -message Platform { - message Property { - string name = 1; - string value = 2; - } - // Entries are sorted by name,value to ensure a canonical form. - repeated Property entry = 1; -} - -// All the fields of this message have the following in common: they are -// expected to define a result. All instructions to remote execution that do -// not affect the result should be out of this message, because this message -// will be used as the Execution Cache key. -message Action { - // The digest of the command (as an encoded proto blob). - // Digested separately, because it is both long and reusable. - ContentDigest command_digest = 1; - // The Merkle digest of the root input directory. - ContentDigest input_root_digest = 2; - // Relative paths to the action's output files and directories. - // The standard output and error stream contents are always returned - // in addition (see ActionResult message). - // Any unnamed output files will be discarded. - // A path will be interpreted as a directory iff it contains a trailing - // slash. - // This should be sorted! - repeated string output_path = 4; - Platform platform = 5; - // All other requirements that affect the result of the action should be - // here as well. -} - -message Command { - // The argument vector. argv[0] is the binary. - repeated string argv = 1; - // A map of environment variables. - // This is part of the command rather than Platform, because it does not - // affect scheduling. - message EnvironmentEntry { - string variable = 1; - string value = 2; - } - // Environment variables, sorted by variable. - repeated EnvironmentEntry environment = 2; - // Possibly other fields the describe the setup actions of the command. -} - -message Output { - string path = 1; - // Yes, we shouldn't have to repeat paths here, but it should not be too - // costly, and might help. - // The actual output bodies will be stored in CAS. - oneof content { - // Populated for directory outputs only, contains the root FileNode digest. - ContentDigest digest = 2; - // Populated for file outputs only. - FileMetadata file_metadata = 3; - } -} - -message ActionResult { - // Relative paths to the action's output files and directories. - // Any unnamed output files will be discarded. - // A path will be interpreted as a directory iff it contains a trailing - // slash. - // This should be sorted by path. - repeated Output output = 1; - int32 return_code = 2; - ContentDigest stdout_digest = 3; - ContentDigest stderr_digest = 4; -} - -// Status message shared by all CAS requests. -message CasStatus { - bool succeeded = 1; // Whether the requested action had server side errors - // or not. - enum ErrorCode { - UNKNOWN = 0; - INVALID_ARGUMENT = 1; // The client behaved incorrectly. error_detail should - // have more information. - MISSING_DIGEST = 2; // Missing a node on tree download. - DIGEST_MISMATCH = 3; // Upload only error, when requested digest does not - // match the server side computed one. - NODE_PARSE_ERROR = 4; // Failed to parse digested data as node. - // more errors... - } - ErrorCode error = 2; - string error_detail = 3; // Human readable error. - // These are a common part of the status for many CAS requests: - repeated ContentDigest missing_digest = 4; - repeated ContentDigest parse_failed_digest = 5; // Only relevant to trees. -} - -service CasService { - // Looks up given content keys in CAS, and returns success when found. - // The single returned status will have the potentially missing digests, - // which need to be re-uploaded. - rpc Lookup(CasLookupRequest) returns (CasLookupReply) { } - // Uploads a directory tree into CAS. Not streamed, because it is only tree - // metadata. - rpc UploadTreeMetadata(CasUploadTreeMetadataRequest) returns - (CasUploadTreeMetadataReply) { } - // Uploads data blob(s) into CAS. - rpc UploadBlob(stream CasUploadBlobRequest) returns (CasUploadBlobReply) { } - // Downoads a directory tree metadata from CAS. - rpc DownloadTreeMetadata(CasDownloadTreeMetadataRequest) returns - (CasDownloadTreeMetadataReply) { } - // Downoads a directory tree from CAS. Returns the entire root directory. - rpc DownloadTree(CasDownloadTreeRequest) returns (stream CasDownloadReply) { } - // Downoads data blob(s) from CAS, returns them. - rpc DownloadBlob(CasDownloadBlobRequest) returns (stream CasDownloadReply) { } -} - -message FileNode { - FileMetadata file_metadata = 1; - message Child { - string path = 1; - ContentDigest digest = 2; - } - // The children should be sorted by path, and not have equal subdirectory - // prefixes. - repeated Child child = 2; -} - -message CasLookupRequest { - repeated ContentDigest digest = 1; -} - -message CasLookupReply { - CasStatus status = 1; -} - -message CasUploadTreeMetadataRequest { - repeated FileNode tree_node = 1; -} - -message CasUploadTreeMetadataReply { - CasStatus status = 1; -} - -message CasDownloadTreeMetadataRequest { - ContentDigest root = 1; -} - -message CasDownloadTreeMetadataReply { - CasStatus status = 1; - repeated FileNode tree_node = 2; -} - -message BlobChunk { - ContentDigest digest = 1; // Present only in first chunk. - int64 offset = 2; - bytes data = 3; -} - -// This will be used for batching files/blobs. -message CasUploadBlobRequest { - BlobChunk data = 1; -} - -message CasUploadBlobReply { - CasStatus status = 1; -} - -message CasDownloadTreeRequest { - ContentDigest root_digest = 1; -} - -// This message is streamed. -message CasDownloadReply { - CasStatus status = 1; - BlobChunk data = 2; - // For trees, data is the entire root directory, zipped (for a single root). - // For blobs, sequential chunks of multiple blobs. -} - -message CasDownloadBlobRequest { - repeated ContentDigest digest = 1; -} - -service ExecutionCacheService { - // Gets results of a cached action. - rpc GetCachedResult(ExecutionCacheRequest) returns (ExecutionCacheReply) { } - // Set results of a cached action. This requires reproducible builds on - // connected machines! - rpc SetCachedResult(ExecutionCacheSetRequest) returns - (ExecutionCacheSetReply) {} -} - -message ExecutionCacheRequest { - ContentDigest action_digest = 1; -} - -message ExecutionCacheStatus { - // Whether the request had any server side errors. For a lookup (get result) - // request, a true value means the result was found in the cache. - bool succeeded = 1; - enum ErrorCode { - UNKNOWN = 0; - MISSING_RESULT = 1; // The result was not found in the execution cache. - UNSUPPORTED = 2; // The request is not supported by the server. - // More server errors... - } - ErrorCode error = 2; - string error_detail = 3; // Human readable error. -} - -message ExecutionCacheReply { - ExecutionCacheStatus status = 1; - ActionResult result = 2; -} - -message ExecutionCacheSetRequest { - ContentDigest action_digest = 1; - ActionResult result = 2; -} - -message ExecutionCacheSetReply { - ExecutionCacheStatus status = 1; -} - -service ExecuteService { - // Executes an action remotely. - rpc Execute(ExecuteRequest) returns (stream ExecuteReply) { } -} - -message ExecuteRequest { - Action action = 1; - bool accept_cached = 2; - // Later will probably add previous attempt history, as it will be - // useful for monitoring and probably scheduling as well. - // These fields will be useful for scheduling, error reporting (e.g. disk - // exceeded) and for log analysis. - int32 total_input_file_count = 3; - int64 total_input_file_bytes = 4; - // Used for monitoring and scheduling. - BuildInfo build_info = 5; - // Timeout milliseconds for running this action. - int64 timeout_millis = 6; // Maybe add io_timeout as well, per - // Marc Antoine's suggestion. - // Add other fields such as required cores, RAM, etc, that affect scheduling, - // but not result, later. All the requirements that DO affect results should - // be part of the Action. -} - -message BuildInfo { - string build_id = 1; - // TBD, Fields used to identify a build action. - // This will be useful for analysis purposes. - // Note: we don't want to put any Bazel-specific fields in this. -} - -message ExecutionStats { -// TBD, will contain all the stats related to the execution: -// time it took, resources, etc. Maybe will break into sub-messages -// for various execution phases. -} - -message ExecuteReply { - ExecutionStatus status = 1; - ActionResult result = 2; - bool cached_result = 3; // Filled by the server on Execution Cache hit. - ExecutionStats execution_stats = 4; - CasStatus cas_error = 5; // A possible server-side CAS error, e.g. missing - // inputs. The message contains the missing digests. - // Later will introduce return AttemptHistory for monitoring and use - // in requests. -} - -message ExecutionStatus { - bool executed = 1; // Whether the action was executed. - bool succeeded = 2; // Whether the action succeeded. - enum ErrorCode { - UNKNOWN_ERROR = 0; - MISSING_COMMAND = 1; // Missing command digest in CAS. - MISSING_INPUT = 2; // Missing one of the inputs in CAS. - DEADLINE_EXCEEDED = 3; - EXEC_FAILED = 4; // Action returned non-zero. - // Other server errors. Some of these errors are client-retriable, and some - // not; will have to comment clearly what will happen on each error. - } - ErrorCode error = 3; - string error_detail = 4; - // These fields allow returning streaming statuses for the action progress. - enum ActionStage { - UNKNOWN_STAGE = 0; - QUEUED = 1; - EXECUTING = 2; - FINISHED = 3; - } - ActionStage stage = 5; - // Optionally will add more details pertaining to current stage, for example - // time executing, or position in queue, etc. -} diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD index 24e0835694..87c71572c2 100644 --- a/src/test/java/com/google/devtools/build/lib/BUILD +++ b/src/test/java/com/google/devtools/build/lib/BUILD @@ -1068,8 +1068,6 @@ java_test( "//src/main/java/com/google/devtools/build/lib/actions", "//src/main/java/com/google/devtools/build/lib/remote", "//src/main/java/com/google/devtools/common/options", - "//src/main/protobuf:remote_protocol_java_grpc", - "//src/main/protobuf:remote_protocol_java_proto", "//third_party:api_client", "//third_party:guava", "//third_party:junit4", @@ -1077,6 +1075,13 @@ java_test( "//third_party:truth", "//third_party/grpc:grpc-jar", "//third_party/protobuf:protobuf_java", + "@googleapis//:google_bytestream_bytestream_java_grpc", + "@googleapis//:google_bytestream_bytestream_java_proto", + "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc", + "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto", + "@googleapis//:google_longrunning_operations_java_proto", + "@googleapis//:google_rpc_code_java_proto", + "@googleapis//:google_rpc_status_java_proto", ], ) diff --git a/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java index c7edee6ed0..ce445f7d4e 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java @@ -14,7 +14,9 @@ package com.google.devtools.build.lib.remote; import static com.google.common.truth.Truth.assertThat; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -35,9 +37,7 @@ import com.google.devtools.build.lib.exec.SpawnRunner; import com.google.devtools.build.lib.exec.SpawnRunner.ProgressStatus; import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionPolicy; import com.google.devtools.build.lib.exec.util.FakeOwner; -import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; -import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; +import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.FileSystem; import com.google.devtools.build.lib.vfs.FileSystemUtils; @@ -45,9 +45,10 @@ import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; import com.google.devtools.common.options.Options; +import com.google.devtools.remoteexecution.v1test.ActionResult; +import com.google.devtools.remoteexecution.v1test.Digest; import com.google.protobuf.ByteString; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.SortedMap; import org.junit.Before; @@ -55,110 +56,99 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.mockito.MockitoAnnotations.Mock; /** Tests for {@link CachedLocalSpawnRunner}. */ @RunWith(JUnit4.class) public class CachedLocalSpawnRunnerTest { - private static final ArtifactExpander SIMPLE_ARTIFACT_EXPANDER = new ArtifactExpander() { - @Override - public void expand(Artifact artifact, Collection<? super Artifact> output) { - output.add(artifact); - } - }; - - private static final ContentDigest DIGEST_FOR_EMPTY = ContentDigests.computeDigest(new byte[0]); + private static final ArtifactExpander SIMPLE_ARTIFACT_EXPANDER = + new ArtifactExpander() { + @Override + public void expand(Artifact artifact, Collection<? super Artifact> output) { + output.add(artifact); + } + }; private FileSystem fs; private Path execRoot; private SimpleSpawn simpleSpawn; private FakeActionInputFileCache fakeFileCache; - + @Mock private RemoteActionCache cache; + @Mock private SpawnRunner delegate; + CachedLocalSpawnRunner runner; private FileOutErr outErr; - private final SpawnExecutionPolicy simplePolicy = new SpawnExecutionPolicy() { - @Override - public void lockOutputFiles() throws InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public ActionInputFileCache getActionInputFileCache() { - return fakeFileCache; - } - - @Override - public long getTimeoutMillis() { - return 0; - } - - @Override - public FileOutErr getFileOutErr() { - return outErr; - } - - @Override - public SortedMap<PathFragment, ActionInput> getInputMapping() throws IOException { - return new SpawnInputExpander(/*strict*/false) - .getInputMapping(simpleSpawn, SIMPLE_ARTIFACT_EXPANDER, fakeFileCache, "workspace"); - } - - @Override - public void report(ProgressStatus state) { - // TODO(ulfjack): Test that the right calls are made. - } - }; + private final SpawnExecutionPolicy simplePolicy = + new SpawnExecutionPolicy() { + @Override + public void lockOutputFiles() throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public ActionInputFileCache getActionInputFileCache() { + return fakeFileCache; + } + + @Override + public long getTimeoutMillis() { + return 0; + } + + @Override + public FileOutErr getFileOutErr() { + return outErr; + } + + @Override + public SortedMap<PathFragment, ActionInput> getInputMapping() throws IOException { + return new SpawnInputExpander(/*strict*/ false) + .getInputMapping(simpleSpawn, SIMPLE_ARTIFACT_EXPANDER, fakeFileCache, "workspace"); + } + + @Override + public void report(ProgressStatus state) { + // TODO(ulfjack): Test that the right calls are made. + } + }; @Before public final void setUp() throws Exception { + MockitoAnnotations.initMocks(this); fs = new InMemoryFileSystem(); execRoot = fs.getPath("/exec/root"); FileSystemUtils.createDirectoryAndParents(execRoot); fakeFileCache = new FakeActionInputFileCache(execRoot); - simpleSpawn = new SimpleSpawn( - new FakeOwner("Mnemonic", "Progress Message"), - ImmutableList.of("/bin/echo", "Hi!"), - ImmutableMap.of("VARIABLE", "value"), - /*executionInfo=*/ImmutableMap.<String, String>of(), - /*inputs=*/ImmutableList.of(ActionInputHelper.fromPath("input")), - /*outputs=*/ImmutableList.<ActionInput>of(), - ResourceSet.ZERO - ); + simpleSpawn = + new SimpleSpawn( + new FakeOwner("Mnemonic", "Progress Message"), + ImmutableList.of("/bin/echo", "Hi!"), + ImmutableMap.of("VARIABLE", "value"), + /*executionInfo=*/ ImmutableMap.<String, String>of(), + /*inputs=*/ ImmutableList.of(ActionInputHelper.fromPath("input")), + /*outputs=*/ ImmutableList.<ActionInput>of(), + ResourceSet.ZERO); Path stdout = fs.getPath("/tmp/stdout"); Path stderr = fs.getPath("/tmp/stderr"); FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory()); FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory()); outErr = new FileOutErr(stdout, stderr); - } - - private void scratch(ActionInput input, String content) throws IOException { - Path inputFile = execRoot.getRelative(input.getExecPath()); - FileSystemUtils.writeContentAsLatin1(inputFile, content); - fakeFileCache.setDigest( - simpleSpawn.getInputFiles().get(0), ByteString.copyFrom(inputFile.getSHA1Digest())); + RemoteOptions options = Options.getDefaults(RemoteOptions.class); + runner = new CachedLocalSpawnRunner(execRoot, options, cache, delegate); + fakeFileCache.createScratchInput(simpleSpawn.getInputFiles().get(0), "xyz"); } @SuppressWarnings("unchecked") @Test public void cacheHit() throws Exception { - RemoteOptions options = Options.getDefaults(RemoteOptions.class); - RemoteActionCache cache = Mockito.mock(RemoteActionCache.class); - SpawnRunner delegate = Mockito.mock(SpawnRunner.class); - CachedLocalSpawnRunner runner = - new CachedLocalSpawnRunner(execRoot, options, cache, delegate); - when(cache.getCachedActionResult(any(ActionKey.class))) - .thenReturn(ActionResult.newBuilder().setReturnCode(0).build()); - when(cache.downloadBlobs(any(Iterable.class))) - .thenReturn(ImmutableList.of(new byte[0], new byte[0])); - - scratch(simpleSpawn.getInputFiles().get(0), "xyz"); + ActionResult actionResult = ActionResult.getDefaultInstance(); + when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(actionResult); SpawnResult result = runner.exec(simpleSpawn, simplePolicy); - // We use verify to check that each method is called exactly once. - // TODO(ulfjack): Check that we also call it with exactly the right parameters, not just any. - verify(cache).getCachedActionResult(any(ActionKey.class)); - verify(cache).downloadAllResults(any(ActionResult.class), any(Path.class)); - verify(cache).downloadBlobs(any(Iterable.class)); + // In line with state-based testing, we only verify calls that produce side effects. + verify(cache).downloadAllResults(actionResult, execRoot); assertThat(result.setupSuccess()).isTrue(); assertThat(result.exitCode()).isEqualTo(0); assertThat(outErr.hasRecordedOutput()).isFalse(); @@ -168,34 +158,42 @@ public class CachedLocalSpawnRunnerTest { @SuppressWarnings("unchecked") @Test public void cacheHitWithOutput() throws Exception { - RemoteOptions options = Options.getDefaults(RemoteOptions.class); - RemoteActionCache cache = Mockito.mock(RemoteActionCache.class); - SpawnRunner delegate = Mockito.mock(SpawnRunner.class); - CachedLocalSpawnRunner runner = - new CachedLocalSpawnRunner(execRoot, options, cache, delegate); - when(cache.getCachedActionResult(any(ActionKey.class))) - .thenReturn(ActionResult.newBuilder().setReturnCode(0).build()); - - scratch(simpleSpawn.getInputFiles().get(0), "xyz"); - byte[] cacheStdOut = "stdout".getBytes(StandardCharsets.UTF_8); - byte[] cacheStdErr = "stderr".getBytes(StandardCharsets.UTF_8); - ContentDigest stdOutDigest = ContentDigests.computeDigest(cacheStdOut); - ContentDigest stdErrDigest = ContentDigests.computeDigest(cacheStdErr); - - ActionResult actionResult = ActionResult.newBuilder() - .setReturnCode(0) - .setStdoutDigest(stdOutDigest) - .setStderrDigest(stdErrDigest) - .build(); + byte[] cacheStdOut = "stdout".getBytes(UTF_8); + byte[] cacheStdErr = "stderr".getBytes(UTF_8); + Digest stdOutDigest = Digests.computeDigest(cacheStdOut); + Digest stdErrDigest = Digests.computeDigest(cacheStdErr); + + ActionResult actionResult = + ActionResult.newBuilder() + .setStdoutDigest(stdOutDigest) + .setStderrDigest(stdErrDigest) + .build(); when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(actionResult); - when(cache.downloadBlobs(any(Iterable.class))) - .thenReturn(ImmutableList.of(cacheStdOut, cacheStdErr)); + when(cache.downloadBlob(stdOutDigest)).thenReturn(cacheStdOut); + when(cache.downloadBlob(stdErrDigest)).thenReturn(cacheStdErr); SpawnResult result = runner.exec(simpleSpawn, simplePolicy); - // We use verify to check that each method is called exactly once. - verify(cache).getCachedActionResult(any(ActionKey.class)); - verify(cache).downloadAllResults(any(ActionResult.class), any(Path.class)); - verify(cache).downloadBlobs(any(Iterable.class)); + // In line with state-based testing, we only verify calls that produce side effects. + verify(cache).downloadAllResults(actionResult, execRoot); + assertThat(result.setupSuccess()).isTrue(); + assertThat(result.exitCode()).isEqualTo(0); + assertThat(outErr.outAsLatin1()).isEqualTo("stdout"); + assertThat(outErr.errAsLatin1()).isEqualTo("stderr"); + } + + @SuppressWarnings("unchecked") + @Test + public void cacheHitWithOutputsInlined() throws Exception { + ActionResult actionResult = + ActionResult.newBuilder() + .setStdoutRaw(ByteString.copyFromUtf8("stdout")) + .setStderrRaw(ByteString.copyFromUtf8("stderr")) + .build(); + when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(actionResult); + + SpawnResult result = runner.exec(simpleSpawn, simplePolicy); + // In line with state-based testing, we only verify calls that produce side effects. + verify(cache).downloadAllResults(actionResult, execRoot); assertThat(result.setupSuccess()).isTrue(); assertThat(result.exitCode()).isEqualTo(0); assertThat(outErr.outAsLatin1()).isEqualTo("stdout"); @@ -205,33 +203,29 @@ public class CachedLocalSpawnRunnerTest { @SuppressWarnings("unchecked") @Test public void cacheMiss() throws Exception { - RemoteOptions options = Options.getDefaults(RemoteOptions.class); - RemoteActionCache cache = Mockito.mock(RemoteActionCache.class); - SpawnRunner delegate = Mockito.mock(SpawnRunner.class); - CachedLocalSpawnRunner runner = - new CachedLocalSpawnRunner(execRoot, options, cache, delegate); - when(cache.getCachedActionResult(any(ActionKey.class))) - .thenReturn(ActionResult.newBuilder().setReturnCode(0).build()); - - scratch(simpleSpawn.getInputFiles().get(0), "xyz"); - - when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(null); - when(cache.uploadFileContents(any(Path.class))).thenReturn(DIGEST_FOR_EMPTY); - SpawnResult delegateResult = new SpawnResult.Builder() - .setExitCode(0) - .setStatus(Status.SUCCESS) - .build(); + byte[] cacheStdOut = "stdout".getBytes(UTF_8); + byte[] cacheStdErr = "stderr".getBytes(UTF_8); + Digest stdOutDigest = Digests.computeDigest(cacheStdOut); + Digest stdErrDigest = Digests.computeDigest(cacheStdErr); + when(cache.uploadFileContents(any(Path.class))).thenReturn(stdErrDigest, stdOutDigest); + SpawnResult delegateResult = + new SpawnResult.Builder().setExitCode(0).setStatus(Status.SUCCESS).build(); when(delegate.exec(any(Spawn.class), any(SpawnExecutionPolicy.class))) .thenReturn(delegateResult); SpawnResult result = runner.exec(simpleSpawn, simplePolicy); + assertThat(result.setupSuccess()).isTrue(); + assertThat(result.exitCode()).isEqualTo(0); // We use verify to check that each method is called the correct number of times. verify(cache) .uploadAllResults(any(Path.class), any(Collection.class), any(ActionResult.Builder.class)); // Two additional uploads for stdout and stderr. verify(cache, Mockito.times(2)).uploadFileContents(any(Path.class)); - verify(cache).setCachedActionResult(any(ActionKey.class), any(ActionResult.class)); - assertThat(result.setupSuccess()).isTrue(); - assertThat(result.exitCode()).isEqualTo(0); + ActionResult actionResult = + ActionResult.newBuilder() + .setStdoutDigest(stdOutDigest) + .setStderrDigest(stdErrDigest) + .build(); + verify(cache).setCachedActionResult(any(ActionKey.class), eq(actionResult)); } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java index 13be580204..b51b9d6259 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java @@ -17,9 +17,8 @@ import static com.google.common.truth.Truth.assertThat; import static java.nio.charset.StandardCharsets.UTF_8; import com.google.common.collect.ImmutableSet; -import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; -import com.google.protobuf.ByteString; +import com.google.devtools.build.lib.remote.Chunker.Chunk; +import com.google.devtools.remoteexecution.v1test.Digest; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -28,37 +27,29 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class ChunkerTest { - static BlobChunk buildChunk(long offset, String data) { - return BlobChunk.newBuilder().setOffset(offset).setData(ByteString.copyFromUtf8(data)).build(); - } - - static BlobChunk buildChunk(ContentDigest digest, String data) { - return BlobChunk.newBuilder().setDigest(digest).setData(ByteString.copyFromUtf8(data)).build(); - } - @Test public void testChunker() throws Exception { byte[] b1 = "abcdefg".getBytes(UTF_8); byte[] b2 = "hij".getBytes(UTF_8); byte[] b3 = "klmnopqrstuvwxyz".getBytes(UTF_8); - ContentDigest d1 = ContentDigests.computeDigest(b1); - ContentDigest d2 = ContentDigests.computeDigest(b2); - ContentDigest d3 = ContentDigests.computeDigest(b3); + Digest d1 = Digests.computeDigest(b1); + Digest d2 = Digests.computeDigest(b2); + Digest d3 = Digests.computeDigest(b3); Chunker c = new Chunker.Builder().chunkSize(5).addInput(b1).addInput(b2).addInput(b3).build(); assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(buildChunk(d1, "abcde")); + assertThat(c.next()).isEqualTo(new Chunk(d1, "abcde".getBytes(UTF_8), 0)); assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(buildChunk(5, "fg")); + assertThat(c.next()).isEqualTo(new Chunk(d1, "fg".getBytes(UTF_8), 5)); assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(buildChunk(d2, "hij")); + assertThat(c.next()).isEqualTo(new Chunk(d2, "hij".getBytes(UTF_8), 0)); assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(buildChunk(d3, "klmno")); + assertThat(c.next()).isEqualTo(new Chunk(d3, "klmno".getBytes(UTF_8), 0)); assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(buildChunk(5, "pqrst")); + assertThat(c.next()).isEqualTo(new Chunk(d3, "pqrst".getBytes(UTF_8), 5)); assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(buildChunk(10, "uvwxy")); + assertThat(c.next()).isEqualTo(new Chunk(d3, "uvwxy".getBytes(UTF_8), 10)); assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(buildChunk(15, "z")); + assertThat(c.next()).isEqualTo(new Chunk(d3, "z".getBytes(UTF_8), 15)); assertThat(c.hasNext()).isFalse(); } @@ -68,8 +59,8 @@ public class ChunkerTest { byte[] b2 = "bb".getBytes(UTF_8); byte[] b3 = "ccc".getBytes(UTF_8); byte[] b4 = "dddd".getBytes(UTF_8); - ContentDigest d1 = ContentDigests.computeDigest(b1); - ContentDigest d3 = ContentDigests.computeDigest(b3); + Digest d1 = Digests.computeDigest(b1); + Digest d3 = Digests.computeDigest(b3); Chunker c = new Chunker.Builder() .chunkSize(2) @@ -80,11 +71,11 @@ public class ChunkerTest { .addInput(b4) .build(); assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(buildChunk(d1, "a")); + assertThat(c.next()).isEqualTo(new Chunk(d1, "a".getBytes(UTF_8), 0)); assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(buildChunk(d3, "cc")); + assertThat(c.next()).isEqualTo(new Chunk(d3, "cc".getBytes(UTF_8), 0)); assertThat(c.hasNext()).isTrue(); - assertThat(c.next()).isEqualTo(buildChunk(2, "c")); + assertThat(c.next()).isEqualTo(new Chunk(d3, "c".getBytes(UTF_8), 2)); assertThat(c.hasNext()).isFalse(); } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java b/src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java index 74994aabdd..3193e0b66f 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java +++ b/src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java @@ -20,7 +20,9 @@ import com.google.common.hash.HashCode; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputFileCache; import com.google.devtools.build.lib.actions.Artifact; +import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.remoteexecution.v1test.Digest; import com.google.protobuf.ByteString; import java.io.IOException; import javax.annotation.Nullable; @@ -28,20 +30,21 @@ import javax.annotation.Nullable; /** A fake implementation of the {@link ActionInputFileCache} interface. */ final class FakeActionInputFileCache implements ActionInputFileCache { private final Path execRoot; - private final BiMap<ActionInput, ByteString> cas = HashBiMap.create(); + private final BiMap<ActionInput, String> cas = HashBiMap.create(); FakeActionInputFileCache(Path execRoot) { this.execRoot = execRoot; } - void setDigest(ActionInput input, ByteString digest) { + void setDigest(ActionInput input, String digest) { cas.put(input, digest); } @Override @Nullable public byte[] getDigest(ActionInput input) throws IOException { - return Preconditions.checkNotNull(cas.get(input), input).toByteArray(); + String hexDigest = Preconditions.checkNotNull(cas.get(input), input); + return HashCode.fromString(hexDigest).asBytes(); } @Override @@ -63,12 +66,20 @@ final class FakeActionInputFileCache implements ActionInputFileCache { @Nullable public ActionInput getInputFromDigest(ByteString hexDigest) { HashCode code = HashCode.fromString(hexDigest.toStringUtf8()); - ByteString digest = ByteString.copyFrom(code.asBytes()); - return Preconditions.checkNotNull(cas.inverse().get(digest)); + return Preconditions.checkNotNull(cas.inverse().get(code.toString())); } @Override public Path getInputPath(ActionInput input) { throw new UnsupportedOperationException(); } + + public Digest createScratchInput(ActionInput input, String content) throws IOException { + Path inputFile = execRoot.getRelative(input.getExecPath()); + FileSystemUtils.createDirectoryAndParents(inputFile.getParentDirectory()); + FileSystemUtils.writeContentAsLatin1(inputFile, content); + Digest digest = Digests.computeDigest(inputFile); + setDigest(input, digest.getHash()); + return digest; + } }
\ No newline at end of file diff --git a/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java b/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java new file mode 100644 index 0000000000..4de1498207 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java @@ -0,0 +1,56 @@ +// Copyright 2017 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase; +import com.google.bytestream.ByteStreamProto.ReadRequest; +import com.google.bytestream.ByteStreamProto.ReadResponse; +import com.google.common.collect.ImmutableMap; +import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.protobuf.ByteString; +import io.grpc.stub.StreamObserver; +import java.util.Map; + +class FakeImmutableCacheByteStreamImpl extends ByteStreamImplBase { + private final Map<ReadRequest, ReadResponse> cannedReplies; + + public FakeImmutableCacheByteStreamImpl(Map<Digest, String> contents) { + ImmutableMap.Builder<ReadRequest, ReadResponse> b = ImmutableMap.builder(); + for (Map.Entry<Digest, String> e : contents.entrySet()) { + b.put( + ReadRequest.newBuilder() + .setResourceName("blobs/" + e.getKey().getHash() + "/" + e.getKey().getSizeBytes()) + .build(), + ReadResponse.newBuilder().setData(ByteString.copyFromUtf8(e.getValue())).build()); + } + cannedReplies = b.build(); + } + + public FakeImmutableCacheByteStreamImpl(Digest digest, String contents) { + this(ImmutableMap.of(digest, contents)); + } + + public FakeImmutableCacheByteStreamImpl(Digest d1, String c1, Digest d2, String c2) { + this(ImmutableMap.of(d1, c1, d2, c2)); + } + + @Override + public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) { + assertThat(cannedReplies.containsKey(request)).isTrue(); + responseObserver.onNext(cannedReplies.get(request)); + responseObserver.onCompleted(); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java index c72e819820..3cb2bd218c 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java @@ -15,78 +15,81 @@ package com.google.devtools.build.lib.remote; import static com.google.common.truth.Truth.assertThat; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.when; import com.google.api.client.json.GenericJson; import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase; +import com.google.bytestream.ByteStreamProto.ReadRequest; +import com.google.bytestream.ByteStreamProto.ReadResponse; +import com.google.bytestream.ByteStreamProto.WriteRequest; +import com.google.bytestream.ByteStreamProto.WriteResponse; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.devtools.build.lib.actions.Root; +import com.google.devtools.build.lib.actions.ActionInputHelper; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; -import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceImplBase; -import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; -import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; import com.google.devtools.build.lib.testutil.Scratch; -import com.google.devtools.build.lib.util.Preconditions; +import com.google.devtools.build.lib.vfs.FileSystem; +import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; import com.google.devtools.common.options.Options; +import com.google.devtools.remoteexecution.v1test.ActionResult; +import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase; +import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest; +import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse; import com.google.protobuf.ByteString; import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; -import io.grpc.ManagedChannel; import io.grpc.MethodDescriptor; import io.grpc.Server; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; +import io.grpc.util.MutableHandlerRegistry; import java.io.IOException; -import java.util.concurrent.ConcurrentMap; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.MockitoAnnotations; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** Tests for {@link GrpcActionCache}. */ @RunWith(JUnit4.class) public class GrpcActionCacheTest { - private final FakeRemoteCacheService fakeRemoteCacheService = new FakeRemoteCacheService(); - - private final Server server = - InProcessServerBuilder.forName(getClass().getSimpleName()) - .directExecutor() - .addService(fakeRemoteCacheService) - .build(); - - private final ManagedChannel channel = - InProcessChannelBuilder.forName(getClass().getSimpleName()).directExecutor().build(); - private Scratch scratch; - private Root rootDir; + private FileSystem fs; + private Path execRoot; + private FakeActionInputFileCache fakeFileCache; + private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); + private final String fakeServerName = "fake server for " + getClass(); + private Server fakeServer; @Before public final void setUp() throws Exception { - MockitoAnnotations.initMocks(this); - scratch = new Scratch(); - rootDir = Root.asDerivedRoot(scratch.dir("/exec/root")); - server.start(); + // Use a mutable service registry for later registering the service impl for each test case. + fakeServer = + InProcessServerBuilder.forName(fakeServerName) + .fallbackHandlerRegistry(serviceRegistry) + .directExecutor() + .build() + .start(); + Chunker.setDefaultChunkSizeForTesting(1000); // Enough for everything to be one chunk. + fs = new InMemoryFileSystem(); + execRoot = fs.getPath("/exec/root"); + FileSystemUtils.createDirectoryAndParents(execRoot); + fakeFileCache = new FakeActionInputFileCache(execRoot); } @After - public void tearDown() { - server.shutdownNow(); - channel.shutdownNow(); + public void tearDown() throws Exception { + fakeServer.shutdownNow(); } private static class ChannelOptionsInterceptor implements ClientInterceptor { @@ -106,316 +109,332 @@ public class GrpcActionCacheTest { } private GrpcActionCache newClient() throws IOException { - return newClient(Options.getDefaults(RemoteOptions.class), - Options.getDefaults(AuthAndTLSOptions.class)); - } + AuthAndTLSOptions authTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); + authTlsOptions.authEnabled = true; + authTlsOptions.authCredentials = "/exec/root/creds.json"; + authTlsOptions.authScope = "dummy.scope"; + + GenericJson json = new GenericJson(); + json.put("type", "authorized_user"); + json.put("client_id", "some_client"); + json.put("client_secret", "foo"); + json.put("refresh_token", "bar"); + Scratch scratch = new Scratch(); + scratch.file(authTlsOptions.authCredentials, new JacksonFactory().toString(json)); - private GrpcActionCache newClient(RemoteOptions remoteOptions, AuthAndTLSOptions authTlsOptions) - throws IOException { ChannelOptions channelOptions = - authTlsOptions.authCredentials != null - ? ChannelOptions.create( - authTlsOptions, remoteOptions.grpcMaxChunkSizeBytes, - scratch.resolve(authTlsOptions.authCredentials).getInputStream()) - : ChannelOptions.create(authTlsOptions, remoteOptions.grpcMaxChunkSizeBytes); + ChannelOptions.create( + authTlsOptions, scratch.resolve(authTlsOptions.authCredentials).getInputStream()); return new GrpcActionCache( ClientInterceptors.intercept( - channel, ImmutableList.of(new ChannelOptionsInterceptor(channelOptions))), - remoteOptions, - channelOptions); + InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(), + ImmutableList.of(new ChannelOptionsInterceptor(channelOptions))), + channelOptions, + Options.getDefaults(RemoteOptions.class)); } @Test - public void testDownloadEmptyBlobs() throws Exception { + public void testDownloadEmptyBlob() throws Exception { GrpcActionCache client = newClient(); - ContentDigest fooDigest = fakeRemoteCacheService.put("foo".getBytes(UTF_8)); - ContentDigest emptyDigest = ContentDigests.computeDigest(new byte[0]); - ImmutableList<byte[]> results = - client.downloadBlobs(ImmutableList.<ContentDigest>of(emptyDigest, fooDigest, emptyDigest)); - // Will not query the server for empty blobs. - assertThat(new String(results.get(0), UTF_8)).isEmpty(); - assertThat(new String(results.get(1), UTF_8)).isEqualTo("foo"); - assertThat(new String(results.get(2), UTF_8)).isEmpty(); - // Will not call the server at all. - assertThat(new String(client.downloadBlob(emptyDigest), UTF_8)).isEmpty(); + Digest emptyDigest = Digests.computeDigest(new byte[0]); + // Will not call the mock Bytestream interface at all. + assertThat(client.downloadBlob(emptyDigest)).isEmpty(); } @Test - public void testDownloadBlobs() throws Exception { - GrpcActionCache client = newClient(); - ContentDigest fooDigest = fakeRemoteCacheService.put("foo".getBytes(UTF_8)); - ContentDigest barDigest = fakeRemoteCacheService.put("bar".getBytes(UTF_8)); - ImmutableList<byte[]> results = - client.downloadBlobs(ImmutableList.<ContentDigest>of(fooDigest, barDigest)); - assertThat(new String(results.get(0), UTF_8)).isEqualTo("foo"); - assertThat(new String(results.get(1), UTF_8)).isEqualTo("bar"); + public void testDownloadBlobSingleChunk() throws Exception { + final GrpcActionCache client = newClient(); + final Digest digest = Digests.computeDigestUtf8("abcdefg"); + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) { + assertThat(request.getResourceName().contains(digest.getHash())).isTrue(); + responseObserver.onNext( + ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abcdefg")).build()); + responseObserver.onCompleted(); + } + }); + assertThat(new String(client.downloadBlob(digest), UTF_8)).isEqualTo("abcdefg"); } @Test - public void testDownloadBlobsBatchChunk() throws Exception { - RemoteOptions options = Options.getDefaults(RemoteOptions.class); - options.grpcMaxBatchInputs = 10; - options.grpcMaxChunkSizeBytes = 2; - options.grpcMaxBatchSizeBytes = 10; - options.remoteTimeout = 10; - GrpcActionCache client = newClient(options, Options.getDefaults(AuthAndTLSOptions.class)); - ContentDigest fooDigest = fakeRemoteCacheService.put("fooooooo".getBytes(UTF_8)); - ContentDigest barDigest = fakeRemoteCacheService.put("baaaar".getBytes(UTF_8)); - ContentDigest s1Digest = fakeRemoteCacheService.put("1".getBytes(UTF_8)); - ContentDigest s2Digest = fakeRemoteCacheService.put("2".getBytes(UTF_8)); - ContentDigest s3Digest = fakeRemoteCacheService.put("3".getBytes(UTF_8)); - ImmutableList<byte[]> results = - client.downloadBlobs( - ImmutableList.<ContentDigest>of(fooDigest, barDigest, s1Digest, s2Digest, s3Digest)); - assertThat(new String(results.get(0), UTF_8)).isEqualTo("fooooooo"); - assertThat(new String(results.get(1), UTF_8)).isEqualTo("baaaar"); - assertThat(new String(results.get(2), UTF_8)).isEqualTo("1"); - assertThat(new String(results.get(3), UTF_8)).isEqualTo("2"); - assertThat(new String(results.get(4), UTF_8)).isEqualTo("3"); + public void testDownloadBlobMultipleChunks() throws Exception { + final GrpcActionCache client = newClient(); + final Digest digest = Digests.computeDigestUtf8("abcdefg"); + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) { + assertThat(request.getResourceName().contains(digest.getHash())).isTrue(); + responseObserver.onNext( + ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abc")).build()); + responseObserver.onNext( + ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("def")).build()); + responseObserver.onNext( + ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("g")).build()); + responseObserver.onCompleted(); + } + }); + assertThat(new String(client.downloadBlob(digest), UTF_8)).isEqualTo("abcdefg"); } @Test - public void testUploadBlobs() throws Exception { + public void testDownloadAllResults() throws Exception { GrpcActionCache client = newClient(); - byte[] foo = "foo".getBytes(UTF_8); - byte[] bar = "bar".getBytes(UTF_8); - ContentDigest fooDigest = ContentDigests.computeDigest(foo); - ContentDigest barDigest = ContentDigests.computeDigest(bar); - ImmutableList<ContentDigest> digests = client.uploadBlobs(ImmutableList.<byte[]>of(foo, bar)); - assertThat(digests).containsExactly(fooDigest, barDigest); - assertThat(fakeRemoteCacheService.get(fooDigest)).isEqualTo(foo); - assertThat(fakeRemoteCacheService.get(barDigest)).isEqualTo(bar); - } + Digest fooDigest = Digests.computeDigestUtf8("foo-contents"); + Digest barDigest = Digests.computeDigestUtf8("bar-contents"); + Digest emptyDigest = Digests.computeDigest(new byte[0]); + serviceRegistry.addService( + new FakeImmutableCacheByteStreamImpl(fooDigest, "foo-contents", barDigest, "bar-contents")); - @Test - public void testUploadBlobsBatchChunk() throws Exception { - RemoteOptions options = Options.getDefaults(RemoteOptions.class); - options.grpcMaxBatchInputs = 10; - options.grpcMaxChunkSizeBytes = 2; - options.grpcMaxBatchSizeBytes = 10; - options.remoteTimeout = 10; - GrpcActionCache client = newClient(options, Options.getDefaults(AuthAndTLSOptions.class)); - - byte[] foo = "fooooooo".getBytes(UTF_8); - byte[] bar = "baaaar".getBytes(UTF_8); - byte[] s1 = "1".getBytes(UTF_8); - byte[] s2 = "2".getBytes(UTF_8); - byte[] s3 = "3".getBytes(UTF_8); - ContentDigest fooDigest = ContentDigests.computeDigest(foo); - ContentDigest barDigest = ContentDigests.computeDigest(bar); - ContentDigest s1Digest = ContentDigests.computeDigest(s1); - ContentDigest s2Digest = ContentDigests.computeDigest(s2); - ContentDigest s3Digest = ContentDigests.computeDigest(s3); - ImmutableList<ContentDigest> digests = - client.uploadBlobs(ImmutableList.<byte[]>of(foo, bar, s1, s2, s3)); - assertThat(digests).containsExactly(fooDigest, barDigest, s1Digest, s2Digest, s3Digest); - assertThat(fakeRemoteCacheService.get(fooDigest)).isEqualTo(foo); - assertThat(fakeRemoteCacheService.get(barDigest)).isEqualTo(bar); - assertThat(fakeRemoteCacheService.get(s1Digest)).isEqualTo(s1); - assertThat(fakeRemoteCacheService.get(s2Digest)).isEqualTo(s2); - assertThat(fakeRemoteCacheService.get(s3Digest)).isEqualTo(s3); - } - - @Test - public void testUploadAllResults() throws Exception { - GrpcActionCache client = newClient(); - byte[] foo = "foo".getBytes(UTF_8); - byte[] bar = "bar".getBytes(UTF_8); - Path fooFile = scratch.file("/exec/root/a/foo", foo); - Path emptyFile = scratch.file("/exec/root/b/empty"); - Path barFile = scratch.file("/exec/root/a/bar", bar); - ContentDigest fooDigest = ContentDigests.computeDigest(fooFile); - ContentDigest barDigest = ContentDigests.computeDigest(barFile); - ContentDigest emptyDigest = ContentDigests.computeDigest(new byte[0]); ActionResult.Builder result = ActionResult.newBuilder(); - client.uploadAllResults( - rootDir.getPath(), ImmutableList.<Path>of(fooFile, emptyFile, barFile), result); - assertThat(fakeRemoteCacheService.get(fooDigest)).isEqualTo(foo); - assertThat(fakeRemoteCacheService.get(barDigest)).isEqualTo(bar); - ActionResult.Builder expectedResult = ActionResult.newBuilder(); - expectedResult - .addOutputBuilder() - .setPath("a/foo") - .getFileMetadataBuilder() - .setDigest(fooDigest); - expectedResult - .addOutputBuilder() - .setPath("b/empty") - .getFileMetadataBuilder() - .setDigest(emptyDigest); - expectedResult - .addOutputBuilder() - .setPath("a/bar") - .getFileMetadataBuilder() - .setDigest(barDigest); - assertThat(result.build()).isEqualTo(expectedResult.build()); + result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest); + result.addOutputFilesBuilder().setPath("b/empty").setDigest(emptyDigest); + result.addOutputFilesBuilder().setPath("a/bar").setDigest(barDigest).setIsExecutable(true); + client.downloadAllResults(result.build(), execRoot); + assertThat(Digests.computeDigest(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest); + assertThat(Digests.computeDigest(execRoot.getRelative("b/empty"))).isEqualTo(emptyDigest); + assertThat(Digests.computeDigest(execRoot.getRelative("a/bar"))).isEqualTo(barDigest); + assertThat(execRoot.getRelative("a/bar").isExecutable()).isTrue(); } @Test - public void testDownloadAllResults() throws Exception { - GrpcActionCache client = newClient(); - ContentDigest fooDigest = fakeRemoteCacheService.put("foo".getBytes(UTF_8)); - ContentDigest barDigest = fakeRemoteCacheService.put("bar".getBytes(UTF_8)); - ContentDigest emptyDigest = ContentDigests.computeDigest(new byte[0]); - ActionResult.Builder result = ActionResult.newBuilder(); - result.addOutputBuilder().setPath("a/foo").getFileMetadataBuilder().setDigest(fooDigest); - result.addOutputBuilder().setPath("b/empty").getFileMetadataBuilder().setDigest(emptyDigest); - result.addOutputBuilder().setPath("a/bar").getFileMetadataBuilder().setDigest(barDigest); - client.downloadAllResults(result.build(), rootDir.getPath()); - Path fooFile = rootDir.getPath().getRelative("a/foo"); - Path emptyFile = rootDir.getPath().getRelative("b/empty"); - Path barFile = rootDir.getPath().getRelative("a/bar"); - assertThat(ContentDigests.computeDigest(fooFile)).isEqualTo(fooDigest); - assertThat(ContentDigests.computeDigest(emptyFile)).isEqualTo(emptyDigest); - assertThat(ContentDigests.computeDigest(barFile)).isEqualTo(barDigest); + public void testUploadBlobCacheHit() throws Exception { + final GrpcActionCache client = newClient(); + final Digest digest = Digests.computeDigestUtf8("abcdefg"); + serviceRegistry.addService( + new ContentAddressableStorageImplBase() { + @Override + public void findMissingBlobs( + FindMissingBlobsRequest request, + StreamObserver<FindMissingBlobsResponse> responseObserver) { + responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + }); + assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest); } @Test - public void testAuthCredentials() throws Exception { - AuthAndTLSOptions options = Options.getDefaults(AuthAndTLSOptions.class); - options.authEnabled = true; - options.authCredentials = "/exec/root/creds.json"; - options.authScope = "dummy.scope"; - - GenericJson json = new GenericJson(); - json.put("type", "authorized_user"); - json.put("client_id", "some_client"); - json.put("client_secret", "foo"); - json.put("refresh_token", "bar"); - scratch.file(options.authCredentials, new JacksonFactory().toString(json)); - - GrpcActionCache client = newClient(Options.getDefaults(RemoteOptions.class), options); - byte[] foo = "foo".getBytes(UTF_8); - ContentDigest fooDigest = ContentDigests.computeDigest(foo); - ImmutableList<ContentDigest> digests = client.uploadBlobs(ImmutableList.<byte[]>of(foo)); - assertThat(digests).containsExactly(fooDigest); - assertThat(fakeRemoteCacheService.get(fooDigest)).isEqualTo(foo); + public void testUploadBlobSingleChunk() throws Exception { + final GrpcActionCache client = newClient(); + final Digest digest = Digests.computeDigestUtf8("abcdefg"); + serviceRegistry.addService( + new ContentAddressableStorageImplBase() { + @Override + public void findMissingBlobs( + FindMissingBlobsRequest request, + StreamObserver<FindMissingBlobsResponse> responseObserver) { + responseObserver.onNext( + FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(digest).build()); + responseObserver.onCompleted(); + } + }); + serviceRegistry.addService( + new ByteStreamImplBase() { + @Override + public StreamObserver<WriteRequest> write( + final StreamObserver<WriteResponse> responseObserver) { + return new StreamObserver<WriteRequest>() { + @Override + public void onNext(WriteRequest request) { + assertThat(request.getResourceName()).contains(digest.getHash()); + assertThat(request.getFinishWrite()).isTrue(); + assertThat(request.getData().toStringUtf8()).isEqualTo("abcdefg"); + } + + @Override + public void onCompleted() { + responseObserver.onNext(WriteResponse.newBuilder().setCommittedSize(7).build()); + responseObserver.onCompleted(); + } + + @Override + public void onError(Throwable t) { + fail("An error occurred: " + t); + } + }; + } + }); + assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest); } - private static class FakeRemoteCacheService extends CasServiceImplBase { - private final ConcurrentMap<String, byte[]> cache = Maps.newConcurrentMap(); - - public ContentDigest put(byte[] blob) { - ContentDigest digest = ContentDigests.computeDigest(blob); - cache.put(ContentDigests.toHexString(digest), blob); - return digest; + static class TestChunkedRequestObserver implements StreamObserver<WriteRequest> { + private final StreamObserver<WriteResponse> responseObserver; + private final String contents; + private Chunker chunker; + + public TestChunkedRequestObserver( + StreamObserver<WriteResponse> responseObserver, String contents, int chunkSizeBytes) { + this.responseObserver = responseObserver; + this.contents = contents; + try { + chunker = Chunker.from(contents.getBytes(UTF_8), chunkSizeBytes); + } catch (IOException e) { + fail("An error occurred:" + e); + } } - public byte[] get(ContentDigest digest) { - return cache.get(ContentDigests.toHexString(digest)); + @Override + public void onNext(WriteRequest request) { + assertThat(chunker.hasNext()).isTrue(); + try { + Chunker.Chunk chunk = chunker.next(); + Digest digest = chunk.getDigest(); + long offset = chunk.getOffset(); + byte[] data = chunk.getData(); + if (offset == 0) { + assertThat(request.getResourceName()).contains(digest.getHash()); + } else { + assertThat(request.getResourceName()).isEmpty(); + } + assertThat(request.getFinishWrite()) + .isEqualTo(offset + data.length == digest.getSizeBytes()); + assertThat(request.getData().toByteArray()).isEqualTo(data); + } catch (IOException e) { + fail("An error occurred:" + e); + } } - public void clear() { - cache.clear(); + @Override + public void onCompleted() { + assertThat(chunker.hasNext()).isFalse(); + responseObserver.onNext( + WriteResponse.newBuilder().setCommittedSize(contents.length()).build()); + responseObserver.onCompleted(); } @Override - public void lookup(CasLookupRequest request, StreamObserver<CasLookupReply> observer) { - CasLookupReply.Builder reply = CasLookupReply.newBuilder(); - CasStatus.Builder status = reply.getStatusBuilder(); - for (ContentDigest digest : request.getDigestList()) { - if (get(digest) == null) { - status.addMissingDigest(digest); - } - } - status.setSucceeded(true); - observer.onNext(reply.build()); - observer.onCompleted(); + public void onError(Throwable t) { + fail("An error occurred: " + t); } + } - @Override - public void downloadBlob( - CasDownloadBlobRequest request, StreamObserver<CasDownloadReply> observer) { - CasDownloadReply.Builder reply = CasDownloadReply.newBuilder(); - CasStatus.Builder status = reply.getStatusBuilder(); - boolean success = true; - for (ContentDigest digest : request.getDigestList()) { - if (get(digest) == null) { - status.addMissingDigest(digest); - success = false; - } - } - if (!success) { - status.setError(CasStatus.ErrorCode.MISSING_DIGEST); - status.setSucceeded(false); - observer.onNext(reply.build()); - observer.onCompleted(); - return; + private Answer<StreamObserver<WriteRequest>> blobChunkedWriteAnswer( + final String contents, final int chunkSize) { + return new Answer<StreamObserver<WriteRequest>>() { + @Override + @SuppressWarnings("unchecked") + public StreamObserver<WriteRequest> answer(InvocationOnMock invocation) { + return new TestChunkedRequestObserver( + (StreamObserver<WriteResponse>) invocation.getArguments()[0], contents, chunkSize); } - // We change the order on purpose, to test for blobs out of order: - for (ContentDigest digest : Lists.reverse(request.getDigestList())) { - observer.onNext( - CasDownloadReply.newBuilder() - .setStatus(CasStatus.newBuilder().setSucceeded(true)) - .setData( - BlobChunk.newBuilder() - .setDigest(digest) - .setData(ByteString.copyFrom(get(digest)))) - .build()); - } - observer.onCompleted(); + }; + } + + @Test + public void testUploadBlobMultipleChunks() throws Exception { + final Digest digest = Digests.computeDigestUtf8("abcdef"); + serviceRegistry.addService( + new ContentAddressableStorageImplBase() { + @Override + public void findMissingBlobs( + FindMissingBlobsRequest request, + StreamObserver<FindMissingBlobsResponse> responseObserver) { + responseObserver.onNext( + FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(digest).build()); + responseObserver.onCompleted(); + } + }); + + ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class); + serviceRegistry.addService(mockByteStreamImpl); + for (int chunkSize = 1; chunkSize <= 6; ++chunkSize) { + GrpcActionCache client = newClient(); + Chunker.setDefaultChunkSizeForTesting(chunkSize); + when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject())) + .thenAnswer(blobChunkedWriteAnswer("abcdef", chunkSize)); + assertThat(client.uploadBlob("abcdef".getBytes(UTF_8))).isEqualTo(digest); } + } - @Override - public StreamObserver<CasUploadBlobRequest> uploadBlob( - final StreamObserver<CasUploadBlobReply> responseObserver) { - return new StreamObserver<CasUploadBlobRequest>() { - byte[] blob = null; - ContentDigest digest = null; - long offset = 0; - - @Override - public void onNext(CasUploadBlobRequest request) { - BlobChunk chunk = request.getData(); - try { - if (chunk.hasDigest()) { - // Check if the previous chunk was really done. - Preconditions.checkArgument( - digest == null || offset == 0, - "Missing input chunk for digest %s", - digest == null ? "" : ContentDigests.toString(digest)); - digest = chunk.getDigest(); - blob = new byte[(int) digest.getSizeBytes()]; - } - Preconditions.checkArgument(digest != null, "First chunk contains no digest"); - Preconditions.checkArgument( - offset == chunk.getOffset(), - "Missing input chunk for digest %s", - ContentDigests.toString(digest)); - if (digest.getSizeBytes() > 0) { - chunk.getData().copyTo(blob, (int) offset); - offset = (offset + chunk.getData().size()) % digest.getSizeBytes(); - } - if (offset == 0) { - ContentDigest uploadedDigest = put(blob); - Preconditions.checkArgument( - uploadedDigest.equals(digest), - "Digest mismatch: client sent %s, server computed %s", - ContentDigests.toString(digest), - ContentDigests.toString(uploadedDigest)); - } - } catch (Exception e) { - CasUploadBlobReply.Builder reply = CasUploadBlobReply.newBuilder(); - reply - .getStatusBuilder() - .setSucceeded(false) - .setError( - e instanceof IllegalArgumentException - ? CasStatus.ErrorCode.INVALID_ARGUMENT - : CasStatus.ErrorCode.UNKNOWN) - .setErrorDetail(e.toString()); - responseObserver.onNext(reply.build()); + @Test + public void testUploadAllResultsCacheHits() throws Exception { + final GrpcActionCache client = newClient(); + final Digest fooDigest = + fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz"); + final Digest barDigest = + fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar"), "x"); + final Path fooFile = execRoot.getRelative("a/foo"); + final Path barFile = execRoot.getRelative("bar"); + barFile.setExecutable(true); + serviceRegistry.addService( + new ContentAddressableStorageImplBase() { + @Override + public void findMissingBlobs( + FindMissingBlobsRequest request, + StreamObserver<FindMissingBlobsResponse> responseObserver) { + assertThat(request) + .isEqualTo( + FindMissingBlobsRequest.newBuilder() + .addBlobDigests(fooDigest) + .addBlobDigests(barDigest) + .build()); + // Nothing is missing. + responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance()); + responseObserver.onCompleted(); } - } + }); - @Override - public void onError(Throwable t) {} + ActionResult.Builder result = ActionResult.newBuilder(); + client.uploadAllResults(execRoot, ImmutableList.<Path>of(fooFile, barFile), result); + ActionResult.Builder expectedResult = ActionResult.newBuilder(); + expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest); + expectedResult + .addOutputFilesBuilder() + .setPath("bar") + .setDigest(barDigest) + .setIsExecutable(true); + assertThat(result.build()).isEqualTo(expectedResult.build()); + } - @Override - public void onCompleted() { - responseObserver.onCompleted(); - } - }; - } + @Test + public void testUploadAllResultsCacheMisses() throws Exception { + final GrpcActionCache client = newClient(); + final Digest fooDigest = + fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz"); + final Digest barDigest = + fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar"), "x"); + final Path fooFile = execRoot.getRelative("a/foo"); + final Path barFile = execRoot.getRelative("bar"); + barFile.setExecutable(true); + serviceRegistry.addService( + new ContentAddressableStorageImplBase() { + @Override + public void findMissingBlobs( + FindMissingBlobsRequest request, + StreamObserver<FindMissingBlobsResponse> responseObserver) { + assertThat(request) + .isEqualTo( + FindMissingBlobsRequest.newBuilder() + .addBlobDigests(fooDigest) + .addBlobDigests(barDigest) + .build()); + // Both are missing. + responseObserver.onNext( + FindMissingBlobsResponse.newBuilder() + .addMissingBlobDigests(fooDigest) + .addMissingBlobDigests(barDigest) + .build()); + responseObserver.onCompleted(); + } + }); + ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class); + serviceRegistry.addService(mockByteStreamImpl); + when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject())) + .thenAnswer(blobChunkedWriteAnswer("xyz", 3)) + .thenAnswer(blobChunkedWriteAnswer("x", 1)); + + ActionResult.Builder result = ActionResult.newBuilder(); + client.uploadAllResults(execRoot, ImmutableList.<Path>of(fooFile, barFile), result); + ActionResult.Builder expectedResult = ActionResult.newBuilder(); + expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest); + expectedResult + .addOutputFilesBuilder() + .setPath("bar") + .setDigest(barDigest) + .setIsExecutable(true); + assertThat(result.build()).isEqualTo(expectedResult.build()); } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java index fc195ab3de..c648fae09f 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java @@ -14,12 +14,16 @@ package com.google.devtools.build.lib.remote; import static com.google.common.truth.Truth.assertThat; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.verify; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.fail; import static org.mockito.Mockito.when; +import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase; +import com.google.bytestream.ByteStreamProto.WriteRequest; +import com.google.bytestream.ByteStreamProto.WriteResponse; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.devtools.build.lib.actions.ActionInput; import com.google.devtools.build.lib.actions.ActionInputFileCache; import com.google.devtools.build.lib.actions.ActionInputHelper; @@ -32,14 +36,6 @@ import com.google.devtools.build.lib.exec.SpawnResult; import com.google.devtools.build.lib.exec.SpawnRunner.ProgressStatus; import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionPolicy; import com.google.devtools.build.lib.exec.util.FakeOwner; -import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus; import com.google.devtools.build.lib.util.io.FileOutErr; import com.google.devtools.build.lib.vfs.FileSystem; import com.google.devtools.build.lib.vfs.FileSystemUtils; @@ -47,118 +43,154 @@ import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; import com.google.devtools.common.options.Options; +import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase; +import com.google.devtools.remoteexecution.v1test.ActionResult; +import com.google.devtools.remoteexecution.v1test.Command; +import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase; +import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.devtools.remoteexecution.v1test.ExecuteRequest; +import com.google.devtools.remoteexecution.v1test.ExecuteResponse; +import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase; +import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest; +import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse; +import com.google.devtools.remoteexecution.v1test.GetActionResultRequest; +import com.google.longrunning.Operation; +import com.google.protobuf.Any; import com.google.protobuf.ByteString; +import com.google.rpc.Code; +import com.google.rpc.Status; +import io.grpc.Channel; +import io.grpc.Server; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.protobuf.StatusProto; +import io.grpc.stub.StreamObserver; +import io.grpc.util.MutableHandlerRegistry; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.Collection; +import java.util.Set; import java.util.SortedMap; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; /** Tests for {@link RemoteSpawnRunner} in combination with {@link GrpcRemoteExecutor}. */ @RunWith(JUnit4.class) public class GrpcRemoteExecutionClientTest { - private static final ArtifactExpander SIMPLE_ARTIFACT_EXPANDER = new ArtifactExpander() { - @Override - public void expand(Artifact artifact, Collection<? super Artifact> output) { - output.add(artifact); - } - }; + private static final ArtifactExpander SIMPLE_ARTIFACT_EXPANDER = + new ArtifactExpander() { + @Override + public void expand(Artifact artifact, Collection<? super Artifact> output) { + output.add(artifact); + } + }; + private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); private FileSystem fs; private Path execRoot; private SimpleSpawn simpleSpawn; private FakeActionInputFileCache fakeFileCache; - + private Digest inputDigest; + private RemoteSpawnRunner client; private FileOutErr outErr; - private long timeoutMillis = 0; - - private final SpawnExecutionPolicy simplePolicy = new SpawnExecutionPolicy() { - @Override - public void lockOutputFiles() throws InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public ActionInputFileCache getActionInputFileCache() { - return fakeFileCache; - } - - @Override - public long getTimeoutMillis() { - return timeoutMillis; - } - - @Override - public FileOutErr getFileOutErr() { - return outErr; - } - - @Override - public SortedMap<PathFragment, ActionInput> getInputMapping() throws IOException { - return new SpawnInputExpander(/*strict*/false) - .getInputMapping(simpleSpawn, SIMPLE_ARTIFACT_EXPANDER, fakeFileCache, "workspace"); - } - - @Override - public void report(ProgressStatus state) { - // TODO(ulfjack): Test that the right calls are made. - } - }; + private Server fakeServer; + + private final SpawnExecutionPolicy simplePolicy = + new SpawnExecutionPolicy() { + @Override + public void lockOutputFiles() throws InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + public ActionInputFileCache getActionInputFileCache() { + return fakeFileCache; + } + + @Override + public long getTimeoutMillis() { + return 0; + } + + @Override + public FileOutErr getFileOutErr() { + return outErr; + } + + @Override + public SortedMap<PathFragment, ActionInput> getInputMapping() throws IOException { + return new SpawnInputExpander(/*strict*/ false) + .getInputMapping(simpleSpawn, SIMPLE_ARTIFACT_EXPANDER, fakeFileCache, "workspace"); + } + + @Override + public void report(ProgressStatus state) { + // TODO(ulfjack): Test that the right calls are made. + } + }; @Before public final void setUp() throws Exception { + String fakeServerName = "fake server for " + getClass(); + // Use a mutable service registry for later registering the service impl for each test case. + fakeServer = + InProcessServerBuilder.forName(fakeServerName) + .fallbackHandlerRegistry(serviceRegistry) + .directExecutor() + .build() + .start(); + + Chunker.setDefaultChunkSizeForTesting(1000); // Enough for everything to be one chunk. fs = new InMemoryFileSystem(); execRoot = fs.getPath("/exec/root"); FileSystemUtils.createDirectoryAndParents(execRoot); fakeFileCache = new FakeActionInputFileCache(execRoot); - simpleSpawn = new SimpleSpawn( - new FakeOwner("Mnemonic", "Progress Message"), - ImmutableList.of("/bin/echo", "Hi!"), - ImmutableMap.of("VARIABLE", "value"), - /*executionInfo=*/ImmutableMap.<String, String>of(), - /*inputs=*/ImmutableList.of(ActionInputHelper.fromPath("input")), - /*outputs=*/ImmutableList.<ActionInput>of(), - ResourceSet.ZERO - ); + simpleSpawn = + new SimpleSpawn( + new FakeOwner("Mnemonic", "Progress Message"), + ImmutableList.of("/bin/echo", "Hi!"), + ImmutableMap.of("VARIABLE", "value"), + /*executionInfo=*/ ImmutableMap.<String, String>of(), + /*inputs=*/ ImmutableList.of(ActionInputHelper.fromPath("input")), + /*outputs=*/ ImmutableList.<ActionInput>of(), + ResourceSet.ZERO); Path stdout = fs.getPath("/tmp/stdout"); Path stderr = fs.getPath("/tmp/stderr"); FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory()); FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory()); outErr = new FileOutErr(stdout, stderr); + RemoteOptions options = Options.getDefaults(RemoteOptions.class); + Channel channel = InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(); + GrpcRemoteExecutor executor = new GrpcRemoteExecutor(channel, ChannelOptions.DEFAULT, options); + GrpcActionCache remoteCache = new GrpcActionCache(channel, ChannelOptions.DEFAULT, options); + client = new RemoteSpawnRunner(execRoot, options, executor, remoteCache); + inputDigest = fakeFileCache.createScratchInput(simpleSpawn.getInputFiles().get(0), "xyz"); } - private void scratch(ActionInput input, String content) throws IOException { - Path inputFile = execRoot.getRelative(input.getExecPath()); - FileSystemUtils.writeContentAsLatin1(inputFile, content); - fakeFileCache.setDigest( - simpleSpawn.getInputFiles().get(0), ByteString.copyFrom(inputFile.getSHA1Digest())); + @After + public void tearDown() throws Exception { + fakeServer.shutdownNow(); } @Test public void cacheHit() throws Exception { - GrpcCasInterface casIface = Mockito.mock(GrpcCasInterface.class); - GrpcExecutionCacheInterface cacheIface = Mockito.mock(GrpcExecutionCacheInterface.class); - GrpcExecutionInterface executionIface = Mockito.mock(GrpcExecutionInterface.class); - RemoteOptions options = Options.getDefaults(RemoteOptions.class); - GrpcRemoteExecutor executor = - new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface); - RemoteSpawnRunner client = new RemoteSpawnRunner(execRoot, options, executor); - - scratch(simpleSpawn.getInputFiles().get(0), "xyz"); - - ExecutionCacheReply reply = ExecutionCacheReply.newBuilder() - .setStatus(ExecutionCacheStatus.newBuilder().setSucceeded(true)) - .setResult(ActionResult.newBuilder().setReturnCode(0)) - .build(); - when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply); + serviceRegistry.addService( + new ActionCacheImplBase() { + @Override + public void getActionResult( + GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) { + responseObserver.onNext(ActionResult.getDefaultInstance()); + responseObserver.onCompleted(); + } + }); SpawnResult result = client.exec(simpleSpawn, simplePolicy); - verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class)); assertThat(result.setupSuccess()).isTrue(); assertThat(result.exitCode()).isEqualTo(0); assertThat(outErr.hasRecordedOutput()).isFalse(); @@ -167,69 +199,154 @@ public class GrpcRemoteExecutionClientTest { @Test public void cacheHitWithOutput() throws Exception { - InMemoryCas casIface = new InMemoryCas(); - GrpcExecutionCacheInterface cacheIface = Mockito.mock(GrpcExecutionCacheInterface.class); - GrpcExecutionInterface executionIface = Mockito.mock(GrpcExecutionInterface.class); - RemoteOptions options = Options.getDefaults(RemoteOptions.class); - GrpcRemoteExecutor executor = - new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface); - RemoteSpawnRunner client = new RemoteSpawnRunner(execRoot, options, executor); - - scratch(simpleSpawn.getInputFiles().get(0), "xyz"); - byte[] cacheStdOut = "stdout".getBytes(StandardCharsets.UTF_8); - byte[] cacheStdErr = "stderr".getBytes(StandardCharsets.UTF_8); - ContentDigest stdOutDigest = casIface.put(cacheStdOut); - ContentDigest stdErrDigest = casIface.put(cacheStdErr); - - ExecutionCacheReply reply = ExecutionCacheReply.newBuilder() - .setStatus(ExecutionCacheStatus.newBuilder().setSucceeded(true)) - .setResult(ActionResult.newBuilder() - .setReturnCode(0) - .setStdoutDigest(stdOutDigest) - .setStderrDigest(stdErrDigest)) - .build(); - when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply); + final Digest stdOutDigest = Digests.computeDigestUtf8("stdout"); + final Digest stdErrDigest = Digests.computeDigestUtf8("stderr"); + serviceRegistry.addService( + new ActionCacheImplBase() { + @Override + public void getActionResult( + GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) { + responseObserver.onNext( + ActionResult.newBuilder() + .setStdoutDigest(stdOutDigest) + .setStderrDigest(stdErrDigest) + .build()); + responseObserver.onCompleted(); + } + }); + serviceRegistry.addService( + new FakeImmutableCacheByteStreamImpl(stdOutDigest, "stdout", stdErrDigest, "stderr")); + + SpawnResult result = client.exec(simpleSpawn, simplePolicy); + assertThat(result.setupSuccess()).isTrue(); + assertThat(result.exitCode()).isEqualTo(0); + assertThat(outErr.outAsLatin1()).isEqualTo("stdout"); + assertThat(outErr.errAsLatin1()).isEqualTo("stderr"); + } + + @Test + public void cacheHitWithInlineOutput() throws Exception { + serviceRegistry.addService( + new ActionCacheImplBase() { + @Override + public void getActionResult( + GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) { + responseObserver.onNext( + ActionResult.newBuilder() + .setStdoutRaw(ByteString.copyFromUtf8("stdout")) + .setStderrRaw(ByteString.copyFromUtf8("stderr")) + .build()); + responseObserver.onCompleted(); + } + }); SpawnResult result = client.exec(simpleSpawn, simplePolicy); - verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class)); assertThat(result.setupSuccess()).isTrue(); assertThat(result.exitCode()).isEqualTo(0); assertThat(outErr.outAsLatin1()).isEqualTo("stdout"); assertThat(outErr.errAsLatin1()).isEqualTo("stderr"); } + private Answer<StreamObserver<WriteRequest>> blobWriteAnswer(final byte[] data) { + final Digest digest = Digests.computeDigest(data); + return new Answer<StreamObserver<WriteRequest>>() { + @Override + public StreamObserver<WriteRequest> answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + final StreamObserver<WriteResponse> responseObserver = + (StreamObserver<WriteResponse>) invocation.getArguments()[0]; + return new StreamObserver<WriteRequest>() { + @Override + public void onNext(WriteRequest request) { + assertThat(request.getResourceName()).contains(digest.getHash()); + assertThat(request.getFinishWrite()).isTrue(); + assertThat(request.getData().toByteArray()).isEqualTo(data); + responseObserver.onNext( + WriteResponse.newBuilder().setCommittedSize(request.getData().size()).build()); + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + + @Override + public void onError(Throwable t) { + fail("An error occurred: " + t); + } + }; + } + }; + } + @Test public void remotelyExecute() throws Exception { - InMemoryCas casIface = new InMemoryCas(); - GrpcExecutionCacheInterface cacheIface = Mockito.mock(GrpcExecutionCacheInterface.class); - GrpcExecutionInterface executionIface = Mockito.mock(GrpcExecutionInterface.class); - RemoteOptions options = Options.getDefaults(RemoteOptions.class); - GrpcRemoteExecutor executor = - new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface); - RemoteSpawnRunner client = new RemoteSpawnRunner(execRoot, options, executor); - - scratch(simpleSpawn.getInputFiles().get(0), "xyz"); - byte[] cacheStdOut = "stdout".getBytes(StandardCharsets.UTF_8); - byte[] cacheStdErr = "stderr".getBytes(StandardCharsets.UTF_8); - ContentDigest stdOutDigest = casIface.put(cacheStdOut); - ContentDigest stdErrDigest = casIface.put(cacheStdErr); - - ExecutionCacheReply reply = ExecutionCacheReply.newBuilder() - .setStatus(ExecutionCacheStatus.newBuilder().setSucceeded(true)) - .build(); - when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply); - - when(executionIface.execute(any(ExecuteRequest.class))).thenReturn(ImmutableList.of( - ExecuteReply.newBuilder() - .setStatus(ExecutionStatus.newBuilder().setSucceeded(true)) - .setResult(ActionResult.newBuilder() - .setReturnCode(0) - .setStdoutDigest(stdOutDigest) - .setStderrDigest(stdErrDigest)) - .build()).iterator()); + // getActionResult mock returns null by default, meaning a cache miss. + serviceRegistry.addService( + new ActionCacheImplBase() { + @Override + public void getActionResult( + GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) { + responseObserver.onError( + StatusProto.toStatusRuntimeException( + Status.newBuilder().setCode(Code.NOT_FOUND.getNumber()).build())); + } + }); + final ActionResult actionResult = + ActionResult.newBuilder() + .setStdoutRaw(ByteString.copyFromUtf8("stdout")) + .setStderrRaw(ByteString.copyFromUtf8("stderr")) + .build(); + serviceRegistry.addService( + new ExecutionImplBase() { + @Override + public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) { + responseObserver.onNext( + Operation.newBuilder() + .setDone(true) + .setResponse( + Any.pack(ExecuteResponse.newBuilder().setResult(actionResult).build())) + .build()); + responseObserver.onCompleted(); + } + }); + final Command command = + Command.newBuilder() + .addAllArguments(ImmutableList.of("/bin/echo", "Hi!")) + .addEnvironmentVariables( + Command.EnvironmentVariable.newBuilder() + .setName("VARIABLE") + .setValue("value") + .build()) + .build(); + final Digest cmdDigest = Digests.computeDigest(command); + serviceRegistry.addService( + new ContentAddressableStorageImplBase() { + @Override + public void findMissingBlobs( + FindMissingBlobsRequest request, + StreamObserver<FindMissingBlobsResponse> responseObserver) { + FindMissingBlobsResponse.Builder b = FindMissingBlobsResponse.newBuilder(); + final Set<Digest> requested = ImmutableSet.copyOf(request.getBlobDigestsList()); + if (requested.contains(cmdDigest)) { + b.addMissingBlobDigests(cmdDigest); + } else if (requested.contains(inputDigest)) { + b.addMissingBlobDigests(inputDigest); + } else { + fail("Unexpected call to findMissingBlobs: " + request); + } + responseObserver.onNext(b.build()); + responseObserver.onCompleted(); + } + }); + + ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class); + when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject())) + .thenAnswer(blobWriteAnswer(command.toByteArray())) + .thenAnswer(blobWriteAnswer("xyz".getBytes(UTF_8))); + serviceRegistry.addService(mockByteStreamImpl); SpawnResult result = client.exec(simpleSpawn, simplePolicy); - verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class)); assertThat(result.setupSuccess()).isTrue(); assertThat(result.exitCode()).isEqualTo(0); assertThat(outErr.outAsLatin1()).isEqualTo("stdout"); diff --git a/src/test/java/com/google/devtools/build/lib/remote/InMemoryCas.java b/src/test/java/com/google/devtools/build/lib/remote/InMemoryCas.java deleted file mode 100644 index cba62a536f..0000000000 --- a/src/test/java/com/google/devtools/build/lib/remote/InMemoryCas.java +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright 2017 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.remote; - -import com.google.common.base.Preconditions; -import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; -import com.google.protobuf.ByteString; -import io.grpc.stub.StreamObserver; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -/** An in-memory implementation of GrpcCasInterface. */ -final class InMemoryCas implements GrpcCasInterface { - private final Map<ByteString, ByteString> content = new HashMap<>(); - - public ContentDigest put(byte[] data) { - ContentDigest digest = ContentDigests.computeDigest(data); - ByteString key = digest.getDigest(); - ByteString value = ByteString.copyFrom(data); - content.put(key, value); - return digest; - } - - @Override - public CasLookupReply lookup(CasLookupRequest request) { - CasStatus.Builder result = CasStatus.newBuilder(); - for (ContentDigest digest : request.getDigestList()) { - ByteString key = digest.getDigest(); - if (!content.containsKey(key)) { - result.addMissingDigest(digest); - } - } - if (result.getMissingDigestCount() != 0) { - result.setError(CasStatus.ErrorCode.MISSING_DIGEST); - } else { - result.setSucceeded(true); - } - return CasLookupReply.newBuilder().setStatus(result).build(); - } - - @Override - public CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request) { - return CasUploadTreeMetadataReply.newBuilder() - .setStatus(CasStatus.newBuilder().setSucceeded(true)) - .build(); - } - - @Override - public CasDownloadTreeMetadataReply downloadTreeMetadata( - CasDownloadTreeMetadataRequest request) { - throw new UnsupportedOperationException(); - } - - @Override - public Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request) { - List<CasDownloadReply> result = new ArrayList<>(); - for (ContentDigest digest : request.getDigestList()) { - CasDownloadReply.Builder builder = CasDownloadReply.newBuilder(); - ByteString item = content.get(digest.getDigest()); - if (item != null) { - builder.setStatus(CasStatus.newBuilder().setSucceeded(true)); - builder.setData(BlobChunk.newBuilder().setData(item).setDigest(digest)); - } else { - throw new IllegalStateException(); - } - result.add(builder.build()); - } - return result.iterator(); - } - - @Override - public StreamObserver<CasUploadBlobRequest> uploadBlobAsync( - final StreamObserver<CasUploadBlobReply> responseObserver) { - return new StreamObserver<CasUploadBlobRequest>() { - private ContentDigest digest; - private ByteArrayOutputStream current; - - @Override - public void onNext(CasUploadBlobRequest value) { - BlobChunk chunk = value.getData(); - if (chunk.hasDigest()) { - Preconditions.checkState(digest == null); - digest = chunk.getDigest(); - current = new ByteArrayOutputStream(); - } - try { - current.write(chunk.getData().toByteArray()); - } catch (IOException e) { - throw new RuntimeException(e); - } - responseObserver.onNext( - CasUploadBlobReply.newBuilder() - .setStatus(CasStatus.newBuilder().setSucceeded(true)) - .build()); - } - - @Override - public void onError(Throwable t) { - throw new RuntimeException(t); - } - - @Override - public void onCompleted() { - ContentDigest check = ContentDigests.computeDigest(current.toByteArray()); - Preconditions.checkState(check.equals(digest), "%s != %s", digest, check); - ByteString key = digest.getDigest(); - ByteString value = ByteString.copyFrom(current.toByteArray()); - digest = null; - current = null; - content.put(key, value); - responseObserver.onCompleted(); - } - }; - } -}
\ No newline at end of file diff --git a/src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java b/src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java index f9ef9111a6..94374f6364 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java @@ -22,14 +22,14 @@ import com.google.devtools.build.lib.actions.ActionInputFileCache; import com.google.devtools.build.lib.actions.Artifact; import com.google.devtools.build.lib.actions.Root; import com.google.devtools.build.lib.exec.SingleBuildFileCache; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; -import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode; import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode; import com.google.devtools.build.lib.testutil.Scratch; import com.google.devtools.build.lib.vfs.FileSystem; import com.google.devtools.build.lib.vfs.FileSystem.HashFunction; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; +import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.devtools.remoteexecution.v1test.Directory; import java.util.ArrayList; import java.util.SortedMap; import java.util.TreeMap; @@ -54,8 +54,8 @@ public class TreeNodeRepositoryTest { } private TreeNodeRepository createTestTreeNodeRepository() { - ActionInputFileCache inputFileCache = new SingleBuildFileCache( - rootPath.getPathString(), scratch.getFileSystem()); + ActionInputFileCache inputFileCache = + new SingleBuildFileCache(rootPath.getPathString(), scratch.getFileSystem()); return new TreeNodeRepository(rootPath, inputFileCache); } @@ -86,34 +86,26 @@ public class TreeNodeRepositoryTest { TreeNode barNode = aNode.getChildEntries().get(0).getChild(); repo.computeMerkleDigests(root); - ImmutableCollection<ContentDigest> digests = repo.getAllDigests(root); - ContentDigest rootDigest = repo.getMerkleDigest(root); - ContentDigest aDigest = repo.getMerkleDigest(aNode); - ContentDigest fooDigest = repo.getMerkleDigest(fooNode); - ContentDigest fooContentsDigest = ContentDigests.computeDigest(foo.getPath()); - ContentDigest barDigest = repo.getMerkleDigest(barNode); - ContentDigest barContentsDigest = ContentDigests.computeDigest(bar.getPath()); - assertThat(digests) - .containsExactly( - rootDigest, aDigest, barDigest, barContentsDigest, fooDigest, fooContentsDigest); + ImmutableCollection<Digest> digests = repo.getAllDigests(root); + Digest rootDigest = repo.getMerkleDigest(root); + Digest aDigest = repo.getMerkleDigest(aNode); + Digest fooDigest = repo.getMerkleDigest(fooNode); // The contents digest. + Digest barDigest = repo.getMerkleDigest(barNode); + assertThat(digests).containsExactly(rootDigest, aDigest, barDigest, fooDigest); - ArrayList<FileNode> fileNodes = new ArrayList<>(); + ArrayList<Directory> directories = new ArrayList<>(); ArrayList<ActionInput> actionInputs = new ArrayList<>(); - repo.getDataFromDigests(digests, actionInputs, fileNodes); + repo.getDataFromDigests(digests, actionInputs, directories); assertThat(actionInputs).containsExactly(bar, foo); - assertThat(fileNodes).hasSize(4); - FileNode rootFileNode = fileNodes.get(0); - assertThat(rootFileNode.getChild(0).getPath()).isEqualTo("a"); - assertThat(rootFileNode.getChild(0).getDigest()).isEqualTo(aDigest); - FileNode aFileNode = fileNodes.get(1); - assertThat(aFileNode.getChild(0).getPath()).isEqualTo("bar"); - assertThat(aFileNode.getChild(0).getDigest()).isEqualTo(barDigest); - assertThat(aFileNode.getChild(1).getPath()).isEqualTo("foo"); - assertThat(aFileNode.getChild(1).getDigest()).isEqualTo(fooDigest); - FileNode barFileNode = fileNodes.get(2); - assertThat(barFileNode.getFileMetadata().getDigest()).isEqualTo(barContentsDigest); - FileNode fooFileNode = fileNodes.get(3); - assertThat(fooFileNode.getFileMetadata().getDigest()).isEqualTo(fooContentsDigest); + assertThat(directories).hasSize(2); + Directory rootDirectory = directories.get(0); + assertThat(rootDirectory.getDirectories(0).getName()).isEqualTo("a"); + assertThat(rootDirectory.getDirectories(0).getDigest()).isEqualTo(aDigest); + Directory aDirectory = directories.get(1); + assertThat(aDirectory.getFiles(0).getName()).isEqualTo("bar"); + assertThat(aDirectory.getFiles(0).getDigest()).isEqualTo(barDigest); + assertThat(aDirectory.getFiles(1).getName()).isEqualTo("foo"); + assertThat(aDirectory.getFiles(1).getDigest()).isEqualTo(fooDigest); } @Test @@ -124,8 +116,8 @@ public class TreeNodeRepositoryTest { TreeNodeRepository repo = createTestTreeNodeRepository(); TreeNode root = repo.buildFromActionInputs(ImmutableList.<ActionInput>of(foo1, foo2, foo3)); repo.computeMerkleDigests(root); - // Reusing same node for the "foo" subtree: only need the root, root child, foo, and contents: - assertThat(repo.getAllDigests(root)).hasSize(4); + // Reusing same node for the "foo" subtree: only need the root, root child, and foo contents: + assertThat(repo.getAllDigests(root)).hasSize(3); } @Test @@ -141,16 +133,12 @@ public class TreeNodeRepositoryTest { TreeNode aNode = root.getChildEntries().get(0).getChild(); TreeNode fooNode = aNode.getChildEntries().get(1).getChild(); // foo > bar in sort order! TreeNode barNode = aNode.getChildEntries().get(0).getChild(); - ImmutableCollection<ContentDigest> digests = repo.getAllDigests(root); - ContentDigest rootDigest = repo.getMerkleDigest(root); - ContentDigest aDigest = repo.getMerkleDigest(aNode); - ContentDigest fooDigest = repo.getMerkleDigest(fooNode); - ContentDigest fooContentsDigest = ContentDigests.computeDigest(foo.getPath()); - ContentDigest barDigest = repo.getMerkleDigest(barNode); - ContentDigest barContentsDigest = ContentDigests.computeDigest(new byte[0]); - assertThat(digests) - .containsExactly( - rootDigest, aDigest, barDigest, barContentsDigest, fooDigest, fooContentsDigest); + ImmutableCollection<Digest> digests = repo.getAllDigests(root); + Digest rootDigest = repo.getMerkleDigest(root); + Digest aDigest = repo.getMerkleDigest(aNode); + Digest fooDigest = repo.getMerkleDigest(fooNode); + Digest barDigest = repo.getMerkleDigest(barNode); + assertThat(digests).containsExactly(rootDigest, aDigest, barDigest, fooDigest); } @Test diff --git a/src/test/shell/bazel/remote_execution_test.sh b/src/test/shell/bazel/remote_execution_test.sh index 6ed3101589..37515bce29 100755 --- a/src/test/shell/bazel/remote_execution_test.sh +++ b/src/test/shell/bazel/remote_execution_test.sh @@ -30,7 +30,6 @@ function set_up() { ${bazel_data}/src/tools/remote_worker/remote_worker \ --work_path=${work_path} \ --listen_port=${worker_port} \ - --grpc_max_chunk_size_bytes=120000000 \ --hazelcast_standalone_listen_port=${hazelcast_port} \ --pid_file=${pid_file} >& $TEST_log & local wait_seconds=0 @@ -187,7 +186,6 @@ EOF bazel --host_jvm_args=-Dbazel.DigestFunction=SHA1 build \ --spawn_strategy=remote \ --noremote_local_fallback \ - --grpc_max_chunk_size_bytes=120000000 \ --remote_executor=localhost:${worker_port} \ --remote_cache=localhost:${worker_port} \ //a:large_output >& $TEST_log \ diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD index a91e5e5d73..ddbb5ede52 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD @@ -25,13 +25,19 @@ java_library( "//src/main/java/com/google/devtools/build/lib:vfs", "//src/main/java/com/google/devtools/build/lib/remote", "//src/main/java/com/google/devtools/common/options", - "//src/main/protobuf:remote_protocol_java_grpc", - "//src/main/protobuf:remote_protocol_java_proto", "//third_party:guava", "//third_party:hazelcast", "//third_party:netty", "//third_party/grpc:grpc-jar", "//third_party/protobuf:protobuf_java", "//third_party/protobuf:protobuf_java_util", + "@googleapis//:google_bytestream_bytestream_java_grpc", + "@googleapis//:google_bytestream_bytestream_java_proto", + "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc", + "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto", + "@googleapis//:google_longrunning_operations_java_proto", + "@googleapis//:google_rpc_code_java_proto", + "@googleapis//:google_rpc_error_details_java_proto", + "@googleapis//:google_rpc_status_java_proto", ], ) diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java index 8558be720f..0eee6695e1 100644 --- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java +++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java @@ -14,43 +14,16 @@ package com.google.devtools.build.remote; -import com.google.common.collect.ImmutableList; -import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; -import com.google.devtools.build.lib.exec.SpawnResult.Status; +import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase; +import com.google.bytestream.ByteStreamProto.ReadRequest; +import com.google.bytestream.ByteStreamProto.ReadResponse; +import com.google.bytestream.ByteStreamProto.WriteRequest; +import com.google.bytestream.ByteStreamProto.WriteResponse; import com.google.devtools.build.lib.remote.CacheNotFoundException; -import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceImplBase; -import com.google.devtools.build.lib.remote.ChannelOptions; import com.google.devtools.build.lib.remote.Chunker; -import com.google.devtools.build.lib.remote.ContentDigests; -import com.google.devtools.build.lib.remote.ContentDigests.ActionKey; -import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceImplBase; -import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceImplBase; +import com.google.devtools.build.lib.remote.Digests; +import com.google.devtools.build.lib.remote.Digests.ActionKey; import com.google.devtools.build.lib.remote.RemoteOptions; -import com.google.devtools.build.lib.remote.RemoteProtocol; -import com.google.devtools.build.lib.remote.RemoteProtocol.Action; -import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult; -import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.Command.EnvironmentEntry; -import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus; -import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus; -import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode; -import com.google.devtools.build.lib.remote.RemoteProtocol.Platform; import com.google.devtools.build.lib.remote.SimpleBlobStore; import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache; import com.google.devtools.build.lib.remote.SimpleBlobStoreFactory; @@ -61,23 +34,48 @@ import com.google.devtools.build.lib.shell.CommandResult; import com.google.devtools.build.lib.shell.TimeoutKillableObserver; import com.google.devtools.build.lib.unix.UnixFileSystem; import com.google.devtools.build.lib.util.OS; -import com.google.devtools.build.lib.util.Preconditions; import com.google.devtools.build.lib.util.ProcessUtils; import com.google.devtools.build.lib.vfs.FileSystem; import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.JavaIoFileSystem; import com.google.devtools.build.lib.vfs.Path; -import com.google.devtools.common.options.Options; import com.google.devtools.common.options.OptionsParser; +import com.google.devtools.remoteexecution.v1test.Action; +import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase; +import com.google.devtools.remoteexecution.v1test.ActionResult; +import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsRequest; +import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsResponse; +import com.google.devtools.remoteexecution.v1test.Command.EnvironmentVariable; +import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase; +import com.google.devtools.remoteexecution.v1test.Digest; +import com.google.devtools.remoteexecution.v1test.ExecuteRequest; +import com.google.devtools.remoteexecution.v1test.ExecuteResponse; +import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase; +import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest; +import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse; +import com.google.devtools.remoteexecution.v1test.GetActionResultRequest; +import com.google.devtools.remoteexecution.v1test.Platform; +import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest; +import com.google.devtools.remoteexecution.v1test.UpdateBlobRequest; +import com.google.longrunning.Operation; +import com.google.protobuf.Any; +import com.google.protobuf.ByteString; import com.google.protobuf.Duration; import com.google.protobuf.util.Durations; +import com.google.rpc.BadRequest; +import com.google.rpc.BadRequest.FieldViolation; +import com.google.rpc.Code; +import com.google.rpc.Status; import io.grpc.Server; +import io.grpc.StatusRuntimeException; import io.grpc.netty.NettyServerBuilder; +import io.grpc.protobuf.StatusProto; import io.grpc.stub.StreamObserver; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.io.PrintWriter; +import java.io.StringWriter; import java.nio.file.FileAlreadyExistsException; import java.util.ArrayList; import java.util.Arrays; @@ -88,6 +86,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; +import javax.annotation.Nullable; /** * Implements a remote worker that accepts work items as protobufs. The server implementation is @@ -96,13 +95,10 @@ import java.util.logging.Logger; public class RemoteWorker { private static final Logger LOG = Logger.getLogger(RemoteWorker.class.getName()); private static final boolean LOG_FINER = LOG.isLoggable(Level.FINER); - - private static final int LOCAL_EXEC_ERROR = -1; - private static final int SIGALRM_EXIT_CODE = /*SIGNAL_BASE=*/128 + /*SIGALRM=*/14; - - private final CasServiceImplBase casServer; - private final ExecuteServiceImplBase execServer; - private final ExecutionCacheServiceImplBase execCacheServer; + private final ContentAddressableStorageImplBase casServer; + private final ByteStreamImplBase bsServer; + private final ExecutionImplBase execServer; + private final ActionCacheImplBase actionCacheServer; private final SimpleBlobStoreActionCache cache; private final RemoteWorkerOptions workerOptions; private final RemoteOptions remoteOptions; @@ -123,16 +119,16 @@ public class RemoteWorker { execServer = null; } casServer = new CasServer(); - execCacheServer = new ExecutionCacheServer(); + bsServer = new ByteStreamServer(); + actionCacheServer = new ActionCacheServer(); } public Server startServer() throws IOException { NettyServerBuilder b = NettyServerBuilder.forPort(workerOptions.listenPort) - .maxMessageSize(ChannelOptions.create(Options.getDefaults(AuthAndTLSOptions.class), - remoteOptions.grpcMaxChunkSizeBytes).maxMessageSize()) .addService(casServer) - .addService(execCacheServer); + .addService(bsServer) + .addService(actionCacheServer); if (execServer != null) { b.addService(execServer); } else { @@ -147,215 +143,258 @@ public class RemoteWorker { return server; } - class CasServer extends CasServiceImplBase { - private static final int MAX_MEMORY_KBYTES = 512 * 1024; + private static @Nullable Digest parseDigestFromResourceName(String resourceName) { + try { + String[] tokens = resourceName.split("/"); + if (tokens.length < 2) { + return null; + } + String hash = tokens[tokens.length - 2]; + long size = Long.parseLong(tokens[tokens.length - 1]); + return Digests.buildDigest(hash, size); + } catch (NumberFormatException e) { + return null; + } + } + + private static StatusRuntimeException internalError(Exception e) { + return StatusProto.toStatusRuntimeException(internalErrorStatus(e)); + } + + private static com.google.rpc.Status internalErrorStatus(Exception e) { + return Status.newBuilder() + .setCode(Code.INTERNAL.getNumber()) + .setMessage("Internal error: " + e) + .build(); + } + + private static StatusRuntimeException notFoundError(Digest digest) { + return StatusProto.toStatusRuntimeException(notFoundStatus(digest)); + } + + private static com.google.rpc.Status notFoundStatus(Digest digest) { + return Status.newBuilder() + .setCode(Code.NOT_FOUND.getNumber()) + .setMessage("Digest not found:" + digest) + .build(); + } + + private static StatusRuntimeException invalidArgumentError(String field, String desc) { + return StatusProto.toStatusRuntimeException(invalidArgumentStatus(field, desc)); + } + + private static com.google.rpc.Status invalidArgumentStatus(String field, String desc) { + FieldViolation v = FieldViolation.newBuilder().setField(field).setDescription(desc).build(); + return Status.newBuilder() + .setCode(Code.INVALID_ARGUMENT.getNumber()) + .setMessage("invalid argument(s): " + field + ": " + desc) + .addDetails(Any.pack(BadRequest.newBuilder().addFieldViolations(v).build())) + .build(); + } + class CasServer extends ContentAddressableStorageImplBase { @Override - public void lookup(CasLookupRequest request, StreamObserver<CasLookupReply> responseObserver) { - CasLookupReply.Builder reply = CasLookupReply.newBuilder(); - CasStatus.Builder status = reply.getStatusBuilder(); - for (ContentDigest digest : request.getDigestList()) { + public void findMissingBlobs( + FindMissingBlobsRequest request, + StreamObserver<FindMissingBlobsResponse> responseObserver) { + FindMissingBlobsResponse.Builder response = FindMissingBlobsResponse.newBuilder(); + for (Digest digest : request.getBlobDigestsList()) { if (!cache.containsKey(digest)) { - status.addMissingDigest(digest); + response.addMissingBlobDigests(digest); } } - if (status.getMissingDigestCount() > 0) { - status.setSucceeded(false); - status.setError(CasStatus.ErrorCode.MISSING_DIGEST); - } else { - status.setSucceeded(true); - } - responseObserver.onNext(reply.build()); + responseObserver.onNext(response.build()); responseObserver.onCompleted(); } @Override - public void uploadTreeMetadata( - CasUploadTreeMetadataRequest request, - StreamObserver<CasUploadTreeMetadataReply> responseObserver) { - try { - for (FileNode treeNode : request.getTreeNodeList()) { - cache.uploadBlob(treeNode.toByteArray()); + public void batchUpdateBlobs( + BatchUpdateBlobsRequest request, + StreamObserver<BatchUpdateBlobsResponse> responseObserver) { + BatchUpdateBlobsResponse.Builder batchResponse = BatchUpdateBlobsResponse.newBuilder(); + for (UpdateBlobRequest r : request.getRequestsList()) { + BatchUpdateBlobsResponse.Response.Builder resp = batchResponse.addResponsesBuilder(); + try { + Digest digest = cache.uploadBlob(r.getData().toByteArray()); + if (!r.getContentDigest().equals(digest)) { + String err = + "Upload digest " + r.getContentDigest() + " did not match data digest: " + digest; + resp.setStatus(invalidArgumentStatus("content_digest", err)); + continue; + } + resp.getStatusBuilder().setCode(Code.OK.getNumber()); + } catch (Exception e) { + resp.setStatus(internalErrorStatus(e)); } - responseObserver.onNext( - CasUploadTreeMetadataReply.newBuilder() - .setStatus(CasStatus.newBuilder().setSucceeded(true)) - .build()); - } catch (Exception e) { - LOG.warning("Request failed: " + e.toString()); - CasUploadTreeMetadataReply.Builder reply = CasUploadTreeMetadataReply.newBuilder(); - reply - .getStatusBuilder() - .setSucceeded(false) - .setError(CasStatus.ErrorCode.UNKNOWN) - .setErrorDetail(e.toString()); - responseObserver.onNext(reply.build()); - } finally { - responseObserver.onCompleted(); } + responseObserver.onNext(batchResponse.build()); + responseObserver.onCompleted(); } + } + class ByteStreamServer extends ByteStreamImplBase { @Override - public void downloadBlob( - CasDownloadBlobRequest request, StreamObserver<CasDownloadReply> responseObserver) { - CasDownloadReply.Builder reply = CasDownloadReply.newBuilder(); - CasStatus.Builder status = reply.getStatusBuilder(); - for (ContentDigest digest : request.getDigestList()) { - if (!cache.containsKey(digest)) { - status.addMissingDigest(digest); - } + public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) { + Digest digest = parseDigestFromResourceName(request.getResourceName()); + if (digest == null) { + responseObserver.onError( + invalidArgumentError( + "resource_name", + "Failed parsing digest from resource_name:" + request.getResourceName())); } - if (status.getMissingDigestCount() > 0) { - status.setSucceeded(false); - status.setError(CasStatus.ErrorCode.MISSING_DIGEST); - responseObserver.onNext(reply.build()); - responseObserver.onCompleted(); + if (!cache.containsKey(digest)) { + responseObserver.onError(notFoundError(digest)); return; } - status.setSucceeded(true); try { - // This still relies on the total blob size to be small enough to fit in memory - // simultaneously! TODO(olaola): refactor to fix this if the need arises. - Chunker.Builder b = new Chunker.Builder().chunkSize(remoteOptions.grpcMaxChunkSizeBytes); - for (ContentDigest digest : request.getDigestList()) { - b.addInput(cache.downloadBlob(digest)); - } - Chunker c = b.build(); + // This still relies on the blob size to be small enough to fit in memory. + // TODO(olaola): refactor to fix this if the need arises. + Chunker c = Chunker.from(cache.downloadBlob(digest)); while (c.hasNext()) { - reply.setData(c.next()); - responseObserver.onNext(reply.build()); - if (reply.hasStatus()) { - reply.clearStatus(); // Only send status on first chunk. - } + responseObserver.onNext( + ReadResponse.newBuilder().setData(ByteString.copyFrom(c.next().getData())).build()); } - } catch (IOException e) { - // This cannot happen, as we are chunking in-memory blobs. - throw new RuntimeException("Internal error: " + e); + responseObserver.onCompleted(); } catch (CacheNotFoundException e) { // This can only happen if an item gets evicted right after we check. - reply.clearData(); - status.setSucceeded(false); - status.setError(CasStatus.ErrorCode.MISSING_DIGEST); - status.addMissingDigest(e.getMissingDigest()); - responseObserver.onNext(reply.build()); - } finally { - responseObserver.onCompleted(); + responseObserver.onError(notFoundError(digest)); + } catch (Exception e) { + LOG.warning("Read request failed: " + e); + responseObserver.onError(internalError(e)); } } @Override - public StreamObserver<CasUploadBlobRequest> uploadBlob( - final StreamObserver<CasUploadBlobReply> responseObserver) { - return new StreamObserver<CasUploadBlobRequest>() { + public StreamObserver<WriteRequest> write( + final StreamObserver<WriteResponse> responseObserver) { + return new StreamObserver<WriteRequest>() { byte[] blob = null; - ContentDigest digest = null; + Digest digest = null; long offset = 0; + String resourceName = null; + boolean closed = false; @Override - public void onNext(CasUploadBlobRequest request) { - BlobChunk chunk = request.getData(); - try { - if (chunk.hasDigest()) { - // Check if the previous chunk was really done. - Preconditions.checkArgument( - digest == null || offset == 0, - "Missing input chunk for digest %s", - digest == null ? "" : ContentDigests.toString(digest)); - digest = chunk.getDigest(); - // This unconditionally downloads the whole blob into memory! - Preconditions.checkArgument((int) (digest.getSizeBytes() / 1024) < MAX_MEMORY_KBYTES); - blob = new byte[(int) digest.getSizeBytes()]; - } - Preconditions.checkArgument(digest != null, "First chunk contains no digest"); - Preconditions.checkArgument( - offset == chunk.getOffset(), - "Missing input chunk for digest %s", - ContentDigests.toString(digest)); - if (digest.getSizeBytes() > 0) { - chunk.getData().copyTo(blob, (int) offset); - offset = (offset + chunk.getData().size()) % digest.getSizeBytes(); - } - if (offset == 0) { - ContentDigest uploadedDigest = cache.uploadBlob(blob); - Preconditions.checkArgument( - uploadedDigest.equals(digest), - "Digest mismatch: client sent %s, server computed %s", - ContentDigests.toString(digest), - ContentDigests.toString(uploadedDigest)); - } - } catch (Exception e) { - LOG.warning("Request failed: " + e.toString()); - CasUploadBlobReply.Builder reply = CasUploadBlobReply.newBuilder(); - reply - .getStatusBuilder() - .setSucceeded(false) - .setError( - e instanceof IllegalArgumentException - ? CasStatus.ErrorCode.INVALID_ARGUMENT - : CasStatus.ErrorCode.UNKNOWN) - .setErrorDetail(e.toString()); - responseObserver.onNext(reply.build()); + public void onNext(WriteRequest request) { + if (closed) { + return; + } + if (digest == null) { + resourceName = request.getResourceName(); + digest = parseDigestFromResourceName(resourceName); + blob = new byte[(int) digest.getSizeBytes()]; + } + if (digest == null) { + responseObserver.onError( + invalidArgumentError( + "resource_name", + "Failed parsing digest from resource_name:" + request.getResourceName())); + closed = true; + return; + } + if (request.getWriteOffset() != offset) { + responseObserver.onError( + invalidArgumentError( + "write_offset", + "Expected:" + offset + ", received: " + request.getWriteOffset())); + closed = true; + return; + } + if (!request.getResourceName().isEmpty() + && !request.getResourceName().equals(resourceName)) { + responseObserver.onError( + invalidArgumentError( + "resource_name", + "Expected:" + resourceName + ", received: " + request.getResourceName())); + closed = true; + return; + } + long size = request.getData().size(); + if (size > 0) { + request.getData().copyTo(blob, (int) offset); + offset += size; + } + boolean shouldFinishWrite = offset == digest.getSizeBytes(); + if (shouldFinishWrite != request.getFinishWrite()) { + responseObserver.onError( + invalidArgumentError( + "finish_write", + "Expected:" + shouldFinishWrite + ", received: " + request.getFinishWrite())); + closed = true; } } @Override public void onError(Throwable t) { - LOG.warning("Request errored remotely: " + t); + LOG.warning("Write request errored remotely: " + t); + closed = true; } @Override public void onCompleted() { - responseObserver.onCompleted(); + if (closed) { + return; + } + if (digest == null || offset != digest.getSizeBytes()) { + responseObserver.onError( + StatusProto.toStatusRuntimeException( + Status.newBuilder() + .setCode(Code.FAILED_PRECONDITION.getNumber()) + .setMessage("Request completed before all data was sent.") + .build())); + closed = true; + return; + } + try { + Digest d = cache.uploadBlob(blob); + if (!d.equals(digest)) { + String err = "Received digest " + digest + " does not match computed digest " + d; + responseObserver.onError(invalidArgumentError("resource_name", err)); + closed = true; + return; + } + responseObserver.onNext(WriteResponse.newBuilder().setCommittedSize(offset).build()); + responseObserver.onCompleted(); + } catch (Exception e) { + LOG.warning("Write request failed: " + e); + responseObserver.onError(internalError(e)); + closed = true; + } } }; } } - class ExecutionCacheServer extends ExecutionCacheServiceImplBase { + class ActionCacheServer extends ActionCacheImplBase { @Override - public void getCachedResult( - ExecutionCacheRequest request, StreamObserver<ExecutionCacheReply> responseObserver) { + public void getActionResult( + GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) { try { - ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest()); - ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder(); + ActionKey actionKey = Digests.unsafeActionKeyFromDigest(request.getActionDigest()); ActionResult result = cache.getCachedActionResult(actionKey); - if (result != null) { - reply.setResult(result); + if (result == null) { + responseObserver.onError(notFoundError(request.getActionDigest())); + return; } - reply.getStatusBuilder().setSucceeded(true); - responseObserver.onNext(reply.build()); - } catch (Exception e) { - LOG.warning("getCachedActionResult request failed: " + e.toString()); - ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder(); - reply - .getStatusBuilder() - .setSucceeded(false) - .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN); - responseObserver.onNext(reply.build()); - } finally { + responseObserver.onNext(result); responseObserver.onCompleted(); + } catch (Exception e) { + LOG.warning("getActionResult request failed: " + e); + responseObserver.onError(internalError(e)); } } @Override - public void setCachedResult( - ExecutionCacheSetRequest request, StreamObserver<ExecutionCacheSetReply> responseObserver) { + public void updateActionResult( + UpdateActionResultRequest request, StreamObserver<ActionResult> responseObserver) { try { - ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest()); - cache.setCachedActionResult(actionKey, request.getResult()); - ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder(); - reply.getStatusBuilder().setSucceeded(true); - responseObserver.onNext(reply.build()); - } catch (Exception e) { - LOG.warning("setCachedActionResult request failed: " + e.toString()); - ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder(); - reply - .getStatusBuilder() - .setSucceeded(false) - .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN); - responseObserver.onNext(reply.build()); - } finally { + ActionKey actionKey = Digests.unsafeActionKeyFromDigest(request.getActionDigest()); + cache.setCachedActionResult(actionKey, request.getActionResult()); + responseObserver.onNext(request.getActionResult()); responseObserver.onCompleted(); + } catch (Exception e) { + LOG.warning("updateActionResult request failed: " + e); + responseObserver.onError(internalError(e)); } } } @@ -363,23 +402,26 @@ public class RemoteWorker { // How long to wait for the uid command. private static final Duration uidTimeout = Durations.fromMicros(30); - class ExecutionServer extends ExecuteServiceImplBase { + class ExecutionServer extends ExecutionImplBase { private final Path workPath; //The name of the container image entry in the Platform proto - // (see src/main/protobuf/remote_protocol.proto and + // (see third_party/googleapis/devtools/remoteexecution/*/remote_execution.proto and // experimental_remote_platform_override in // src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java) public static final String CONTAINER_IMAGE_ENTRY_NAME = "container-image"; + private static final int LOCAL_EXEC_ERROR = -1; + public ExecutionServer(Path workPath) { this.workPath = workPath; } - private Map<String, String> getEnvironmentVariables(RemoteProtocol.Command command) { + private Map<String, String> getEnvironmentVariables( + com.google.devtools.remoteexecution.v1test.Command command) { HashMap<String, String> result = new HashMap<>(); - for (EnvironmentEntry entry : command.getEnvironmentList()) { - result.put(entry.getVariable(), entry.getValue()); + for (EnvironmentVariable v : command.getEnvironmentVariablesList()) { + result.put(v.getName(), v.getValue()); } return result; } @@ -412,17 +454,14 @@ public class RemoteWorker { // null. Otherwise returns docker container name from the parameters. private String dockerContainer(Action action) throws IllegalArgumentException { String result = null; - List<Platform.Property> entries = action.getPlatform().getEntryList(); - - for (Platform.Property entry : entries) { - if (entry.getName().equals(CONTAINER_IMAGE_ENTRY_NAME)) { - if (result == null) { - result = entry.getValue(); - } else { + for (Platform.Property property : action.getPlatform().getPropertiesList()) { + if (property.getName().equals(CONTAINER_IMAGE_ENTRY_NAME)) { + if (result != null) { // Multiple container name entries throw new IllegalArgumentException( "Multiple entries for " + CONTAINER_IMAGE_ENTRY_NAME + " in action.Platform"); } + result = property.getValue(); } } return result; @@ -479,38 +518,34 @@ public class RemoteWorker { new File(pathString)); } - public ExecuteReply execute(Action action, Path execRoot) - throws IOException, InterruptedException, IllegalArgumentException { - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - RemoteProtocol.Command command; - try { - command = - RemoteProtocol.Command.parseFrom(cache.downloadBlob(action.getCommandDigest())); - cache.downloadTree(action.getInputRootDigest(), execRoot); - } catch (CacheNotFoundException e) { - LOG.warning("Cache miss on " + ContentDigests.toString(e.getMissingDigest())); - return ExecuteReply.newBuilder() - .setCasError( - CasStatus.newBuilder() - .setSucceeded(false) - .addMissingDigest(e.getMissingDigest()) - .setError(CasStatus.ErrorCode.MISSING_DIGEST) - .setErrorDetail(e.toString())) - .setStatus( - ExecutionStatus.newBuilder() - .setExecuted(false) - .setSucceeded(false) - .setError( - e.getMissingDigest() == action.getCommandDigest() - ? ExecutionStatus.ErrorCode.MISSING_COMMAND - : ExecutionStatus.ErrorCode.MISSING_INPUT) - .setErrorDetail(e.toString())) - .build(); + static final int MAX_BLOB_SIZE_FOR_INLINE = 1024 * 10; + + private void passOutErr(byte[] stdout, byte[] stderr, ActionResult.Builder result) + throws InterruptedException { + if (stdout.length <= MAX_BLOB_SIZE_FOR_INLINE) { + result.setStdoutRaw(ByteString.copyFrom(stdout)); + } else if (stdout.length > 0) { + result.setStdoutDigest(cache.uploadBlob(stdout)); + } + if (stderr.length <= MAX_BLOB_SIZE_FOR_INLINE) { + result.setStderrRaw(ByteString.copyFrom(stderr)); + } else if (stderr.length > 0) { + result.setStderrDigest(cache.uploadBlob(stderr)); } + } - List<Path> outputs = new ArrayList<>(action.getOutputPathList().size()); - for (String output : action.getOutputPathList()) { + public ActionResult execute(Action action, Path execRoot) + throws IOException, InterruptedException, IllegalArgumentException, CacheNotFoundException { + ByteArrayOutputStream stdout = new ByteArrayOutputStream(); + ByteArrayOutputStream stderr = new ByteArrayOutputStream(); + ActionResult.Builder result = ActionResult.newBuilder(); + com.google.devtools.remoteexecution.v1test.Command command = + com.google.devtools.remoteexecution.v1test.Command.parseFrom( + cache.downloadBlob(action.getCommandDigest())); + cache.downloadTree(action.getInputRootDigest(), execRoot); + + List<Path> outputs = new ArrayList<>(action.getOutputFilesList().size()); + for (String output : action.getOutputFilesList()) { Path file = execRoot.getRelative(output); if (file.exists()) { throw new FileAlreadyExistsException("Output file already exists: " + file); @@ -518,63 +553,58 @@ public class RemoteWorker { FileSystemUtils.createDirectoryAndParents(file.getParentDirectory()); outputs.add(file); } + // TODO(olaola): support output directories. // TODO(ulfjack): This is basically a copy of LocalSpawnRunner. Ideally, we'd use that // implementation instead of copying it. - // TODO(ulfjack): Timeout is specified in ExecuteRequest, but not passed in yet. - int timeoutSeconds = 60 * 15; Command cmd = getCommand( action, - command.getArgvList().toArray(new String[] {}), + command.getArgumentsList().toArray(new String[] {}), getEnvironmentVariables(command), execRoot.getPathString()); - long startTime = System.currentTimeMillis(); - CommandResult cmdResult; + CommandResult cmdResult = null; try { cmdResult = cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdout, stderr, true); } catch (AbnormalTerminationException e) { cmdResult = e.getResult(); } catch (CommandException e) { - // At the time this comment was written, this must be a ExecFailedException encapsulating an - // IOException from the underlying Subprocess.Factory. - LOG.warning("Execution failed for " + command.getArgvList()); - return ExecuteReply.newBuilder() - .setResult( - ActionResult.newBuilder() - .setReturnCode(LOCAL_EXEC_ERROR)) - .setStatus( - ExecutionStatus.newBuilder() - .setExecuted(false) - .setSucceeded(false) - .setError(ExecutionStatus.ErrorCode.EXEC_FAILED) - .setErrorDetail(e.toString())) - .build(); + // At the time this comment was written, this must be a ExecFailedException encapsulating + // an IOException from the underlying Subprocess.Factory. + } + final int timeoutSeconds = 60 * 15; + // TODO(ulfjack): Timeout is specified in ExecuteRequest, but not passed in yet. + boolean wasTimeout = + cmdResult != null && cmdResult.getTerminationStatus().timedout() + || wasTimeout(timeoutSeconds, System.currentTimeMillis() - startTime); + int exitCode; + if (wasTimeout) { + final String errMessage = + "Command:\n" + + command.getArgumentsList() + + "\nexceeded deadline of " + + timeoutSeconds + + "seconds"; + LOG.warning(errMessage); + throw StatusProto.toStatusRuntimeException( + Status.newBuilder() + .setCode(Code.DEADLINE_EXCEEDED.getNumber()) + .setMessage(errMessage) + .build()); + } else if (cmdResult == null) { + exitCode = LOCAL_EXEC_ERROR; + } else { + exitCode = cmdResult.getTerminationStatus().getRawExitCode(); + } + + passOutErr(stdout.toByteArray(), stderr.toByteArray(), result); + cache.uploadAllResults(execRoot, outputs, result); + ActionResult finalResult = result.setExitCode(exitCode).build(); + if (exitCode == 0) { + cache.setCachedActionResult(Digests.computeActionKey(action), finalResult); } - long wallTime = System.currentTimeMillis() - startTime; - boolean wasTimeout = cmdResult.getTerminationStatus().timedout() - || wasTimeout(timeoutSeconds, wallTime); - Status status = wasTimeout ? Status.TIMEOUT : Status.SUCCESS; - int exitCode = status == Status.TIMEOUT - ? SIGALRM_EXIT_CODE - : cmdResult.getTerminationStatus().getRawExitCode(); - - ImmutableList<ContentDigest> outErrDigests = - cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray())); - ContentDigest stdoutDigest = outErrDigests.get(0); - ContentDigest stderrDigest = outErrDigests.get(1); - ActionResult.Builder actionResult = - ActionResult.newBuilder() - .setReturnCode(exitCode) - .setStdoutDigest(stdoutDigest) - .setStderrDigest(stderrDigest); - cache.uploadAllResults(execRoot, outputs, actionResult); - cache.setCachedActionResult(ContentDigests.computeActionKey(action), actionResult.build()); - return ExecuteReply.newBuilder() - .setResult(actionResult) - .setStatus(ExecutionStatus.newBuilder().setExecuted(true).setSucceeded(true)) - .build(); + return finalResult; } private boolean wasTimeout(int timeoutSeconds, long wallTimeMillis) { @@ -582,7 +612,7 @@ public class RemoteWorker { } @Override - public void execute(ExecuteRequest request, StreamObserver<ExecuteReply> responseObserver) { + public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) { Path tempRoot = workPath.getRelative("build-" + UUID.randomUUID().toString()); try { tempRoot.createDirectory(); @@ -591,33 +621,42 @@ public class RemoteWorker { "Work received has " + request.getTotalInputFileCount() + " input files and " - + request.getAction().getOutputPathCount() + + request.getAction().getOutputFilesCount() + " output files."); } - ExecuteReply reply = execute(request.getAction(), tempRoot); - responseObserver.onNext(reply); + ActionResult result = execute(request.getAction(), tempRoot); + responseObserver.onNext( + Operation.newBuilder() + .setDone(true) + .setResponse(Any.pack(ExecuteResponse.newBuilder().setResult(result).build())) + .build()); + responseObserver.onCompleted(); if (workerOptions.debug) { - if (!reply.getStatus().getSucceeded()) { - LOG.warning("Work failed. Request: " + request.toString() + "."); - } else if (LOG_FINER) { - LOG.fine("Work completed."); - } - } - if (!workerOptions.debug) { - FileSystemUtils.deleteTree(tempRoot); + LOG.fine("Work completed."); + LOG.warning("Preserving work directory " + tempRoot); } else { - LOG.warning("Preserving work directory " + tempRoot.toString() + "."); + FileSystemUtils.deleteTree(tempRoot); } - } catch (IOException | InterruptedException e) { - LOG.log(Level.SEVERE, "Failure", e); - ExecuteReply.Builder reply = ExecuteReply.newBuilder(); - reply.getStatusBuilder().setSucceeded(false).setErrorDetail(e.toString()); - responseObserver.onNext(reply.build()); + } catch (CacheNotFoundException e) { + LOG.warning("Cache miss on " + e.getMissingDigest()); + responseObserver.onError(notFoundError(e.getMissingDigest())); + } catch (StatusRuntimeException e) { + responseObserver.onError(e); + } catch (IllegalArgumentException e) { + responseObserver.onError( + StatusProto.toStatusRuntimeException( + Status.newBuilder() + .setCode(Code.INVALID_ARGUMENT.getNumber()) + .setMessage(e.toString()) + .build())); + } catch (Exception e) { + StringWriter stringWriter = new StringWriter(); + e.printStackTrace(new PrintWriter(stringWriter)); + LOG.log(Level.SEVERE, "Work failed: " + e + stringWriter.toString()); + responseObserver.onError(internalError(e)); if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } - } finally { - responseObserver.onCompleted(); } } } |