aboutsummaryrefslogtreecommitdiffhomepage
path: root/src/main/java
diff options
context:
space:
mode:
authorGravatar olaola <olaola@google.com>2017-06-09 04:33:25 +0200
committerGravatar Jakob Buchgraber <buchgr@google.com>2017-06-09 10:23:23 +0200
commit460a105ea61c23b1553a89d8f7a14170ad359e08 (patch)
tree066f0f98f03536efad02ab5ad0e64d9d4c5e054f /src/main/java
parent88677dbef8a9a1f88a35229f342fdf8523273456 (diff)
Also refactored away the various *Interface* files, no need since unit testing can be done with mocking the appropriate gRPC Impl classes directly (see tests). This also fixes the RemoteSpawnRunner, which should use different objects for remote caching and remote execution, the same way RemoteSpawnStrategy does. RELNOTES: n/a PiperOrigin-RevId: 158473700
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/BUILD9
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java10
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java102
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java24
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/Chunker.java218
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/Digests.java (renamed from src/main/java/com/google/devtools/build/lib/remote/ContentDigests.java)61
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java603
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java43
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java29
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java27
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java131
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java80
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/README.md4
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java29
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java32
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java136
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java163
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java3
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java115
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java138
20 files changed, 863 insertions, 1094 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD
index 3a3c304d1e..d2e82efa60 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD
@@ -23,8 +23,6 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/actions",
"//src/main/java/com/google/devtools/build/lib/standalone",
"//src/main/java/com/google/devtools/common/options",
- "//src/main/protobuf:remote_protocol_java_grpc",
- "//src/main/protobuf:remote_protocol_java_proto",
"//third_party:apache_httpclient",
"//third_party:apache_httpcore",
"//third_party:auth",
@@ -35,6 +33,13 @@ java_library(
"//third_party:netty",
"//third_party/grpc:grpc-jar",
"//third_party/protobuf:protobuf_java",
+ "//third_party/protobuf:protobuf_java_util",
+ "@googleapis//:google_bytestream_bytestream_java_grpc",
+ "@googleapis//:google_bytestream_bytestream_java_proto",
+ "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc",
+ "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto",
+ "@googleapis//:google_longrunning_operations_java_proto",
+ "@googleapis//:google_rpc_error_details_java_proto",
],
)
diff --git a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java b/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
index 5820e0bf95..cffc58fd4b 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
@@ -14,25 +14,25 @@
package com.google.devtools.build.lib.remote;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+import com.google.devtools.remoteexecution.v1test.Digest;
/**
* An exception to indicate cache misses.
* TODO(olaola): have a class of checked RemoteCacheExceptions.
*/
public final class CacheNotFoundException extends Exception {
- private final ContentDigest missingDigest;
+ private final Digest missingDigest;
- CacheNotFoundException(ContentDigest missingDigest) {
+ CacheNotFoundException(Digest missingDigest) {
this.missingDigest = missingDigest;
}
- public ContentDigest getMissingDigest() {
+ public Digest getMissingDigest() {
return missingDigest;
}
@Override
public String toString() {
- return "Missing digest: " + ContentDigests.toString(missingDigest);
+ return "Missing digest: " + missingDigest;
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java
index 87da7c2405..62605e1c2d 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java
@@ -13,28 +13,27 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.Spawn;
+import com.google.devtools.build.lib.actions.Spawns;
import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.exec.SpawnResult;
import com.google.devtools.build.lib.exec.SpawnResult.Status;
import com.google.devtools.build.lib.exec.SpawnRunner;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Command;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Platform;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.remoteexecution.v1test.Action;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Command;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.Platform;
+import com.google.protobuf.Duration;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
import io.grpc.StatusRuntimeException;
@@ -56,14 +55,11 @@ final class CachedLocalSpawnRunner implements SpawnRunner {
// TODO(olaola): This will be set on a per-action basis instead.
private final Platform platform;
- private final RemoteActionCache actionCache;
+ private final RemoteActionCache remoteCache;
private final SpawnRunner delegate;
CachedLocalSpawnRunner(
- Path execRoot,
- RemoteOptions options,
- RemoteActionCache actionCache,
- SpawnRunner delegate) {
+ Path execRoot, RemoteOptions options, RemoteActionCache remoteCache, SpawnRunner delegate) {
this.execRoot = execRoot;
this.options = options;
if (options.experimentalRemotePlatformOverride != null) {
@@ -77,15 +73,13 @@ final class CachedLocalSpawnRunner implements SpawnRunner {
} else {
platform = null;
}
- this.actionCache = actionCache;
+ this.remoteCache = remoteCache;
this.delegate = delegate;
}
@Override
- public SpawnResult exec(
- Spawn spawn,
- SpawnExecutionPolicy policy)
- throws InterruptedException, IOException, ExecException {
+ public SpawnResult exec(Spawn spawn, SpawnExecutionPolicy policy)
+ throws InterruptedException, IOException, ExecException {
ActionKey actionKey = null;
String mnemonic = spawn.getMnemonic();
@@ -100,24 +94,26 @@ final class CachedLocalSpawnRunner implements SpawnRunner {
Action action =
buildAction(
spawn.getOutputFiles(),
- ContentDigests.computeDigest(command),
- repository.getMerkleDigest(inputRoot));
+ Digests.computeDigest(command),
+ repository.getMerkleDigest(inputRoot),
+ // TODO(olaola): set sensible local and remote timeouts.
+ Spawns.getTimeoutSeconds(spawn, 120));
// Look up action cache, and reuse the action output if it is found.
- actionKey = ContentDigests.computeActionKey(action);
+ actionKey = Digests.computeActionKey(action);
ActionResult result =
- this.options.remoteAcceptCached ? actionCache.getCachedActionResult(actionKey) : null;
+ this.options.remoteAcceptCached ? remoteCache.getCachedActionResult(actionKey) : null;
if (result != null) {
// We don't cache failed actions, so we know the outputs exist.
// For now, download all outputs locally; in the future, we can reuse the digests to
// just update the TreeNodeRepository and continue the build.
try {
// TODO(ulfjack): Download stdout, stderr, and the output files in a single call.
- actionCache.downloadAllResults(result, execRoot);
+ remoteCache.downloadAllResults(result, execRoot);
passRemoteOutErr(result, policy.getFileOutErr());
return new SpawnResult.Builder()
.setStatus(Status.SUCCESS)
- .setExitCode(result.getReturnCode())
+ .setExitCode(result.getExitCode())
.build();
} catch (CacheNotFoundException e) {
// TODO(ulfjack): Track down who throws this exception in what cases and double-check that
@@ -138,44 +134,62 @@ final class CachedLocalSpawnRunner implements SpawnRunner {
}
private Action buildAction(
- Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) {
+ Collection<? extends ActionInput> outputs,
+ Digest command,
+ Digest inputRoot,
+ long timeoutSeconds) {
Action.Builder action = Action.newBuilder();
action.setCommandDigest(command);
action.setInputRootDigest(inputRoot);
// Somewhat ugly: we rely on the stable order of outputs here for remote action caching.
for (ActionInput output : outputs) {
- action.addOutputPath(output.getExecPathString());
+ // TODO: output directories should be handled here, when they are supported.
+ action.addOutputFiles(output.getExecPathString());
}
if (platform != null) {
action.setPlatform(platform);
}
+ action.setTimeout(Duration.newBuilder().setSeconds(timeoutSeconds));
return action.build();
}
- private static Command buildCommand(
- List<String> arguments, ImmutableMap<String, String> environment) {
+ private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) {
Command.Builder command = Command.newBuilder();
- command.addAllArgv(arguments);
+ command.addAllArguments(arguments);
// Sorting the environment pairs by variable name.
TreeSet<String> variables = new TreeSet<>(environment.keySet());
for (String var : variables) {
- command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var));
+ command.addEnvironmentVariablesBuilder().setName(var).setValue(environment.get(var));
}
return command.build();
}
- private void passRemoteOutErr(
- ActionResult result, FileOutErr outErr)
- throws CacheNotFoundException {
- ImmutableList<byte[]> streams =
- actionCache.downloadBlobs(
- ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest()));
- outErr.printOut(new String(streams.get(0), UTF_8));
- outErr.printErr(new String(streams.get(1), UTF_8));
+ private void passRemoteOutErr(ActionResult result, FileOutErr outErr) throws IOException {
+ try {
+ if (!result.getStdoutRaw().isEmpty()) {
+ result.getStdoutRaw().writeTo(outErr.getOutputStream());
+ outErr.getOutputStream().flush();
+ } else if (result.hasStdoutDigest()) {
+ byte[] stdoutBytes = remoteCache.downloadBlob(result.getStdoutDigest());
+ outErr.getOutputStream().write(stdoutBytes);
+ outErr.getOutputStream().flush();
+ }
+ if (!result.getStderrRaw().isEmpty()) {
+ result.getStderrRaw().writeTo(outErr.getErrorStream());
+ outErr.getErrorStream().flush();
+ } else if (result.hasStderrDigest()) {
+ byte[] stderrBytes = remoteCache.downloadBlob(result.getStderrDigest());
+ outErr.getErrorStream().write(stderrBytes);
+ outErr.getErrorStream().flush();
+ }
+ } catch (CacheNotFoundException e) {
+ outErr.printOutLn("Failed to fetch remote stdout/err due to cache miss.");
+ outErr.getOutputStream().flush();
+ }
}
private void writeCacheEntry(Spawn spawn, FileOutErr outErr, ActionKey actionKey)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
ArrayList<Path> outputFiles = new ArrayList<>();
for (ActionInput output : spawn.getOutputFiles()) {
Path outputPath = execRoot.getRelative(output.getExecPathString());
@@ -186,11 +200,11 @@ final class CachedLocalSpawnRunner implements SpawnRunner {
}
}
ActionResult.Builder result = ActionResult.newBuilder();
- actionCache.uploadAllResults(execRoot, outputFiles, result);
- ContentDigest stderr = actionCache.uploadFileContents(outErr.getErrorPath());
- ContentDigest stdout = actionCache.uploadFileContents(outErr.getOutputPath());
+ remoteCache.uploadAllResults(execRoot, outputFiles, result);
+ Digest stderr = remoteCache.uploadFileContents(outErr.getErrorPath());
+ Digest stdout = remoteCache.uploadFileContents(outErr.getOutputPath());
result.setStderrDigest(stderr);
result.setStdoutDigest(stdout);
- actionCache.setCachedActionResult(actionKey, result.build());
+ remoteCache.setCachedActionResult(actionKey, result.build());
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java b/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java
index c2654586bf..a78458d135 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java
@@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+import com.google.devtools.common.options.Options;
import io.grpc.CallCredentials;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.netty.GrpcSslContexts;
@@ -33,24 +34,21 @@ import javax.net.ssl.SSLException;
/** Instantiate all authentication helpers from build options. */
@ThreadSafe
public final class ChannelOptions {
- private final int maxMessageSize;
private final boolean tlsEnabled;
private final SslContext sslContext;
private final String tlsAuthorityOverride;
private final CallCredentials credentials;
- private static final int CHUNK_MESSAGE_OVERHEAD = 1024;
+ public static final ChannelOptions DEFAULT = create(Options.getDefaults(AuthAndTLSOptions.class));
private ChannelOptions(
boolean tlsEnabled,
SslContext sslContext,
String tlsAuthorityOverride,
- CallCredentials credentials,
- int maxMessageSize) {
+ CallCredentials credentials) {
this.tlsEnabled = tlsEnabled;
this.sslContext = sslContext;
this.tlsAuthorityOverride = tlsAuthorityOverride;
this.credentials = credentials;
- this.maxMessageSize = maxMessageSize;
}
public boolean tlsEnabled() {
@@ -69,15 +67,10 @@ public final class ChannelOptions {
return sslContext;
}
- public int maxMessageSize() {
- return maxMessageSize;
- }
-
- public static ChannelOptions create(AuthAndTLSOptions options, int grpcMaxChunkSizeBytes) {
+ public static ChannelOptions create(AuthAndTLSOptions options) {
try {
return create(
options,
- grpcMaxChunkSizeBytes,
options.authCredentials != null
? new FileInputStream(options.authCredentials)
: null);
@@ -88,7 +81,8 @@ public final class ChannelOptions {
}
@VisibleForTesting
- public static ChannelOptions create(AuthAndTLSOptions options, int grpcMaxChunkSizeBytes,
+ public static ChannelOptions create(
+ AuthAndTLSOptions options,
@Nullable InputStream credentialsInputStream) {
boolean tlsEnabled = options.tlsEnabled;
SslContext sslContext = null;
@@ -118,11 +112,7 @@ public final class ChannelOptions {
"Failed initializing auth credentials for remote cache/execution " + e);
}
}
- final int maxMessageSize =
- Math.max(
- 4 * 1024 * 1024 /* GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE */,
- grpcMaxChunkSizeBytes + CHUNK_MESSAGE_OVERHEAD);
return new ChannelOptions(
- tlsEnabled, sslContext, tlsAuthorityOverride, credentials, maxMessageSize);
+ tlsEnabled, sslContext, tlsAuthorityOverride, credentials);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
index a9bdd6dc13..0c407a29b7 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
@@ -14,114 +14,162 @@
package com.google.devtools.build.lib.remote;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
-import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.Path;
-import com.google.protobuf.ByteString;
+import com.google.devtools.remoteexecution.v1test.Digest;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Set;
-import javax.annotation.Nullable;
-/** An iterator-type object that transforms byte sources into a stream of BlobChunk messages. */
+/** An iterator-type object that transforms byte sources into a stream of Chunks. */
public final class Chunker {
+ // This is effectively final, should be changed only in unit-tests!
+ private static int DEFAULT_CHUNK_SIZE = 1024 * 16;
+ private static byte[] EMPTY_BLOB = new byte[0];
+
+ @VisibleForTesting
+ static void setDefaultChunkSizeForTesting(int value) {
+ DEFAULT_CHUNK_SIZE = value;
+ }
+
+ public static int getDefaultChunkSize() {
+ return DEFAULT_CHUNK_SIZE;
+ }
+
+ /** A piece of a byte[] blob. */
+ public static final class Chunk {
+
+ private final Digest digest;
+ private final long offset;
+ // TODO(olaola): consider saving data in a different format that byte[].
+ private final byte[] data;
+
+ @VisibleForTesting
+ public Chunk(Digest digest, byte[] data, long offset) {
+ this.digest = digest;
+ this.data = data;
+ this.offset = offset;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof Chunk)) {
+ return false;
+ }
+ Chunk other = (Chunk) o;
+ return other.offset == offset
+ && other.digest.equals(digest)
+ && Arrays.equals(other.data, data);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(digest, offset, Arrays.hashCode(data));
+ }
+
+ public Digest getDigest() {
+ return digest;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ // This returns a mutable copy, for efficiency.
+ public byte[] getData() {
+ return data;
+ }
+ }
+
/** An Item is an opaque digestable source of bytes. */
interface Item {
- ContentDigest getDigest() throws IOException;
+ Digest getDigest() throws IOException;
InputStream getInputStream() throws IOException;
}
private final Iterator<Item> inputIterator;
private InputStream currentStream;
- private final Set<ContentDigest> digests;
- private ContentDigest digest;
+ private Digest digest;
private long bytesLeft;
private final int chunkSize;
- Chunker(
- Iterator<Item> inputIterator,
- int chunkSize,
- // If present, specifies which digests to output out of the whole input.
- @Nullable Set<ContentDigest> digests)
- throws IOException {
+ Chunker(Iterator<Item> inputIterator, int chunkSize) throws IOException {
Preconditions.checkArgument(chunkSize > 0, "Chunk size must be greater than 0");
- this.digests = digests;
this.inputIterator = inputIterator;
this.chunkSize = chunkSize;
advanceInput();
}
- Chunker(Iterator<Item> inputIterator, int chunkSize) throws IOException {
- this(inputIterator, chunkSize, null);
+ Chunker(Item input, int chunkSize) throws IOException {
+ this(Iterators.singletonIterator(input), chunkSize);
}
- Chunker(Item input, int chunkSize) throws IOException {
- this(Iterators.singletonIterator(input), chunkSize, ImmutableSet.of(input.getDigest()));
- }
-
- private void advanceInput() throws IOException {
- do {
- if (inputIterator != null && inputIterator.hasNext()) {
- Item input = inputIterator.next();
- digest = input.getDigest();
- currentStream = input.getInputStream();
- bytesLeft = digest.getSizeBytes();
- } else {
- digest = null;
- currentStream = null;
- bytesLeft = 0;
- }
- } while (digest != null && digests != null && !digests.contains(digest));
+ public void advanceInput() throws IOException {
+ if (inputIterator != null && inputIterator.hasNext()) {
+ Item input = inputIterator.next();
+ digest = input.getDigest();
+ currentStream = input.getInputStream();
+ bytesLeft = digest.getSizeBytes();
+ } else {
+ digest = null;
+ currentStream = null;
+ bytesLeft = 0;
+ }
}
- /** True if the object has more BlobChunk elements. */
+ /** True if the object has more Chunk elements. */
public boolean hasNext() {
return currentStream != null;
}
- /** Consume the next BlobChunk element. */
- public BlobChunk next() throws IOException {
+ /** Consume the next Chunk element. */
+ public Chunk next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
- BlobChunk.Builder chunk = BlobChunk.newBuilder();
- long offset = digest.getSizeBytes() - bytesLeft;
- if (offset == 0) {
- chunk.setDigest(digest);
- } else {
- chunk.setOffset(offset);
- }
+ final long offset = digest.getSizeBytes() - bytesLeft;
+ byte[] blob = EMPTY_BLOB;
if (bytesLeft > 0) {
- byte[] blob = new byte[(int) Math.min(bytesLeft, chunkSize)];
+ blob = new byte[(int) Math.min(bytesLeft, chunkSize)];
currentStream.read(blob);
- chunk.setData(ByteString.copyFrom(blob));
bytesLeft -= blob.length;
}
+ final Chunk result = new Chunk(digest, blob, offset);
if (bytesLeft == 0) {
currentStream.close();
advanceInput(); // Sets the current stream to null, if it was the last.
}
- return chunk.build();
+ return result;
}
static Item toItem(final byte[] blob) {
return new Item() {
+ Digest digest = null;
+
@Override
- public ContentDigest getDigest() throws IOException {
- return ContentDigests.computeDigest(blob);
+ public Digest getDigest() throws IOException {
+ if (digest == null) {
+ digest = Digests.computeDigest(blob);
+ }
+ return digest;
}
@Override
@@ -133,9 +181,14 @@ public final class Chunker {
static Item toItem(final Path file) {
return new Item() {
+ Digest digest = null;
+
@Override
- public ContentDigest getDigest() throws IOException {
- return ContentDigests.computeDigest(file);
+ public Digest getDigest() throws IOException {
+ if (digest == null) {
+ digest = Digests.computeDigest(file);
+ }
+ return digest;
}
@Override
@@ -152,8 +205,8 @@ public final class Chunker {
}
return new Item() {
@Override
- public ContentDigest getDigest() throws IOException {
- return ContentDigests.getDigestFromInputCache(input, inputCache);
+ public Digest getDigest() throws IOException {
+ return Digests.getDigestFromInputCache(input, inputCache);
}
@Override
@@ -165,9 +218,14 @@ public final class Chunker {
static Item toItem(final VirtualActionInput input) {
return new Item() {
+ Digest digest = null;
+
@Override
- public ContentDigest getDigest() throws IOException {
- return ContentDigests.computeDigest(input);
+ public Digest getDigest() throws IOException {
+ if (digest == null) {
+ digest = Digests.computeDigest(input);
+ }
+ return digest;
}
@Override
@@ -189,27 +247,67 @@ public final class Chunker {
return new Chunker(toItem(input, inputCache, execRoot), chunkSize);
}
+ /**
+ * Create a Chunker from a given ActionInput, taking its digest from the provided
+ * ActionInputFileCache.
+ */
+ public static Chunker from(ActionInput input, ActionInputFileCache inputCache, Path execRoot)
+ throws IOException {
+ return from(input, getDefaultChunkSize(), inputCache, execRoot);
+ }
+
/** Create a Chunker from a given blob and chunkSize. */
public static Chunker from(byte[] blob, int chunkSize) throws IOException {
return new Chunker(toItem(blob), chunkSize);
}
+ /** Create a Chunker from a given blob. */
+ public static Chunker from(byte[] blob) throws IOException {
+ return from(blob, getDefaultChunkSize());
+ }
+
/** Create a Chunker from a given Path and chunkSize. */
public static Chunker from(Path file, int chunkSize) throws IOException {
return new Chunker(toItem(file), chunkSize);
}
+ /** Create a Chunker from a given Path. */
+ public static Chunker from(Path file) throws IOException {
+ return from(file, getDefaultChunkSize());
+ }
+
+ static class MemberOf implements Predicate<Item> {
+ private final Set<Digest> digests;
+
+ public MemberOf(Set<Digest> digests) {
+ this.digests = digests;
+ }
+
+ @Override
+ public boolean apply(Item item) {
+ try {
+ return digests.contains(item.getDigest());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
/**
* Create a Chunker from multiple input sources. The order of the sources provided to the Builder
* will be the same order they will be chunked by.
*/
public static final class Builder {
private final ArrayList<Item> items = new ArrayList<>();
- private Set<ContentDigest> digests = null;
- private int chunkSize = 0;
+ private Set<Digest> digests = null;
+ private int chunkSize = getDefaultChunkSize();
public Chunker build() throws IOException {
- return new Chunker(items.iterator(), chunkSize, digests);
+ return new Chunker(
+ digests == null
+ ? items.iterator()
+ : Iterators.filter(items.iterator(), new MemberOf(digests)),
+ chunkSize);
}
public Builder chunkSize(int chunkSize) {
@@ -221,7 +319,7 @@ public final class Chunker {
* Restricts the Chunker to use only inputs with these digests. This is an optimization for CAS
* uploads where a list of digests missing from the CAS is known.
*/
- public Builder onlyUseDigests(Set<ContentDigest> digests) {
+ public Builder onlyUseDigests(Set<Digest> digests) {
this.digests = digests;
return this;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ContentDigests.java b/src/main/java/com/google/devtools/build/lib/remote/Digests.java
index 21a739c468..9121bc2782 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ContentDigests.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Digests.java
@@ -14,34 +14,35 @@
package com.google.devtools.build.lib.remote;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
import com.google.devtools.build.lib.vfs.Path;
-import com.google.protobuf.ByteString;
+import com.google.devtools.remoteexecution.v1test.Action;
+import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.protobuf.Message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-/** Helper methods relating to computing ContentDigest messages for remote execution. */
+/** Helper methods relating to computing Digest messages for remote execution. */
@ThreadSafe
-public final class ContentDigests {
- private ContentDigests() {}
+public final class Digests {
+ private Digests() {}
- public static ContentDigest computeDigest(byte[] blob) {
- return buildDigest(Hashing.sha1().hashBytes(blob).asBytes(), blob.length);
+ public static Digest computeDigest(byte[] blob) {
+ return buildDigest(Hashing.sha1().hashBytes(blob).toString(), blob.length);
}
- public static ContentDigest computeDigest(Path file) throws IOException {
+ public static Digest computeDigest(Path file) throws IOException {
return buildDigest(file.getSHA1Digest(), file.getFileSize());
}
- public static ContentDigest computeDigest(VirtualActionInput input) throws IOException {
+ public static Digest computeDigest(VirtualActionInput input) throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
input.writeTo(buffer);
return computeDigest(buffer.toByteArray());
@@ -52,22 +53,26 @@ public final class ContentDigests {
* bytes, but this implementation relies on the stability of the proto encoding, in particular
* between different platforms and languages. TODO(olaola): upgrade to a better implementation!
*/
- public static ContentDigest computeDigest(Message message) {
+ public static Digest computeDigest(Message message) {
return computeDigest(message.toByteArray());
}
+ public static Digest computeDigestUtf8(String str) {
+ return computeDigest(str.getBytes(UTF_8));
+ }
+
/**
- * A special type of ContentDigest that is used only as a remote action cache key. This is a
- * separate type in order to prevent accidentally using other ContentDigests as action keys.
+ * A special type of Digest that is used only as a remote action cache key. This is a
+ * separate type in order to prevent accidentally using other Digests as action keys.
*/
public static final class ActionKey {
- private final ContentDigest digest;
+ private final Digest digest;
- public ContentDigest getDigest() {
+ public Digest getDigest() {
return digest;
}
- private ActionKey(ContentDigest digest) {
+ private ActionKey(Digest digest) {
this.digest = digest;
}
}
@@ -77,29 +82,23 @@ public final class ContentDigests {
}
/**
- * Assumes that the given ContentDigest is a valid digest of an Action, and creates an ActionKey
+ * Assumes that the given Digest is a valid digest of an Action, and creates an ActionKey
* wrapper. This should not be called on the client side!
*/
- public static ActionKey unsafeActionKeyFromDigest(ContentDigest digest) {
+ public static ActionKey unsafeActionKeyFromDigest(Digest digest) {
return new ActionKey(digest);
}
- public static ContentDigest buildDigest(byte[] digest, long size) {
- ContentDigest.Builder b = ContentDigest.newBuilder();
- b.setDigest(ByteString.copyFrom(digest)).setSizeBytes(size);
- return b.build();
+ public static Digest buildDigest(byte[] hash, long size) {
+ return buildDigest(HashCode.fromBytes(hash).toString(), size);
}
- public static ContentDigest getDigestFromInputCache(ActionInput input, ActionInputFileCache cache)
- throws IOException {
- return buildDigest(cache.getDigest(input), cache.getSizeInBytes(input));
+ public static Digest buildDigest(String hexHash, long size) {
+ return Digest.newBuilder().setHash(hexHash).setSizeBytes(size).build();
}
- public static String toHexString(ContentDigest digest) {
- return HashCode.fromBytes(digest.getDigest().toByteArray()).toString();
- }
-
- public static String toString(ContentDigest digest) {
- return "<digest: " + toHexString(digest) + ", size: " + digest.getSizeBytes() + " bytes>";
+ public static Digest getDigestFromInputCache(ActionInput input, ActionInputFileCache cache)
+ throws IOException {
+ return buildDigest(cache.getDigest(input), cache.getSizeInBytes(input));
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
index d85125f15c..d93842244d 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
@@ -14,52 +14,53 @@
package com.google.devtools.build.lib.remote;
+import com.google.bytestream.ByteStreamGrpc;
+import com.google.bytestream.ByteStreamGrpc.ByteStreamBlockingStub;
+import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
+import com.google.bytestream.ByteStreamProto.ReadRequest;
+import com.google.bytestream.ByteStreamProto.ReadResponse;
+import com.google.bytestream.ByteStreamProto.WriteRequest;
+import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileMetadata;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Output;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Output.ContentCase;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
-import com.google.devtools.build.lib.util.Pair;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc;
+import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheBlockingStub;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsRequest;
+import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsResponse;
+import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc;
+import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageBlockingStub;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.Directory;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
+import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
+import com.google.devtools.remoteexecution.v1test.OutputDirectory;
+import com.google.devtools.remoteexecution.v1test.OutputFile;
+import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest;
import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
+import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -67,50 +68,76 @@ import java.util.concurrent.atomic.AtomicReference;
/** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */
@ThreadSafe
public class GrpcActionCache implements RemoteActionCache {
- private static final int MAX_MEMORY_KBYTES = 512 * 1024;
-
- /** Channel over which to send gRPC CAS queries. */
- private final GrpcCasInterface casIface;
-
- private final GrpcExecutionCacheInterface iface;
private final RemoteOptions options;
-
- public GrpcActionCache(
- RemoteOptions options, GrpcCasInterface casIface, GrpcExecutionCacheInterface iface) {
- this.options = options;
- this.casIface = casIface;
- this.iface = iface;
- }
+ private final ChannelOptions channelOptions;
+ private final Channel channel;
@VisibleForTesting
- public GrpcActionCache(
- Channel channel, RemoteOptions options, ChannelOptions channelOptions) {
+ public GrpcActionCache(Channel channel, ChannelOptions channelOptions, RemoteOptions options) {
this.options = options;
- this.casIface =
- GrpcInterfaces.casInterface(options.remoteTimeout, channel, channelOptions);
- this.iface =
- GrpcInterfaces.executionCacheInterface(options.remoteTimeout, channel, channelOptions);
+ this.channelOptions = channelOptions;
+ this.channel = channel;
}
- public GrpcActionCache(RemoteOptions options, ChannelOptions channelOptions) {
- this(RemoteUtils.createChannel(options.remoteCache, channelOptions), options, channelOptions);
- }
+ // All gRPC stubs are reused.
+ private final Supplier<ContentAddressableStorageBlockingStub> casBlockingStub =
+ Suppliers.memoize(
+ new Supplier<ContentAddressableStorageBlockingStub>() {
+ @Override
+ public ContentAddressableStorageBlockingStub get() {
+ return ContentAddressableStorageGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+ });
+
+ private final Supplier<ByteStreamBlockingStub> bsBlockingStub =
+ Suppliers.memoize(
+ new Supplier<ByteStreamBlockingStub>() {
+ @Override
+ public ByteStreamBlockingStub get() {
+ return ByteStreamGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+ });
+
+ private final Supplier<ByteStreamStub> bsStub =
+ Suppliers.memoize(
+ new Supplier<ByteStreamStub>() {
+ @Override
+ public ByteStreamStub get() {
+ return ByteStreamGrpc.newStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+ });
+
+ private final Supplier<ActionCacheBlockingStub> acBlockingStub =
+ Suppliers.memoize(
+ new Supplier<ActionCacheBlockingStub>() {
+ @Override
+ public ActionCacheBlockingStub get() {
+ return ActionCacheGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+ });
public static boolean isRemoteCacheOptions(RemoteOptions options) {
return options.remoteCache != null;
}
- private ImmutableSet<ContentDigest> getMissingDigests(Iterable<ContentDigest> digests) {
- CasLookupRequest.Builder request = CasLookupRequest.newBuilder().addAllDigest(digests);
- if (request.getDigestCount() == 0) {
+ private ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests) {
+ FindMissingBlobsRequest.Builder request =
+ FindMissingBlobsRequest.newBuilder()
+ .setInstanceName(options.remoteInstanceName)
+ .addAllBlobDigests(digests);
+ if (request.getBlobDigestsCount() == 0) {
return ImmutableSet.of();
}
- CasStatus status = casIface.lookup(request.build()).getStatus();
- if (!status.getSucceeded() && status.getError() != CasStatus.ErrorCode.MISSING_DIGEST) {
- // TODO(olaola): here and below, add basic retry logic on transient errors!
- throw new RuntimeException(status.getErrorDetail());
- }
- return ImmutableSet.copyOf(status.getMissingDigestList());
+ FindMissingBlobsResponse response = casBlockingStub.get().findMissingBlobs(request.build());
+ return ImmutableSet.copyOf(response.getMissingBlobDigestsList());
}
/**
@@ -122,26 +149,37 @@ public class GrpcActionCache implements RemoteActionCache {
throws IOException, InterruptedException {
repository.computeMerkleDigests(root);
// TODO(olaola): avoid querying all the digests, only ask for novel subtrees.
- ImmutableSet<ContentDigest> missingDigests = getMissingDigests(repository.getAllDigests(root));
+ ImmutableSet<Digest> missingDigests = getMissingDigests(repository.getAllDigests(root));
// Only upload data that was missing from the cache.
ArrayList<ActionInput> actionInputs = new ArrayList<>();
- ArrayList<FileNode> treeNodes = new ArrayList<>();
+ ArrayList<Directory> treeNodes = new ArrayList<>();
repository.getDataFromDigests(missingDigests, actionInputs, treeNodes);
if (!treeNodes.isEmpty()) {
- CasUploadTreeMetadataRequest.Builder metaRequest =
- CasUploadTreeMetadataRequest.newBuilder().addAllTreeNode(treeNodes);
- CasUploadTreeMetadataReply reply = casIface.uploadTreeMetadata(metaRequest.build());
- if (!reply.getStatus().getSucceeded()) {
- throw new RuntimeException(reply.getStatus().getErrorDetail());
+ // TODO(olaola): split this into multiple requests if total size is > 10MB.
+ BatchUpdateBlobsRequest.Builder treeBlobRequest =
+ BatchUpdateBlobsRequest.newBuilder().setInstanceName(options.remoteInstanceName);
+ for (Directory d : treeNodes) {
+ final byte[] data = d.toByteArray();
+ treeBlobRequest
+ .addRequestsBuilder()
+ .setContentDigest(Digests.computeDigest(data))
+ .setData(ByteString.copyFrom(data));
+ }
+ BatchUpdateBlobsResponse response =
+ casBlockingStub.get().batchUpdateBlobs(treeBlobRequest.build());
+ // TODO(olaola): handle retries on transient errors.
+ for (BatchUpdateBlobsResponse.Response r : response.getResponsesList()) {
+ if (!Status.fromCodeValue(r.getStatus().getCode()).isOk()) {
+ throw StatusProto.toStatusRuntimeException(r.getStatus());
+ }
}
}
if (!actionInputs.isEmpty()) {
uploadChunks(
actionInputs.size(),
new Chunker.Builder()
- .chunkSize(options.grpcMaxChunkSizeBytes)
.addAllInputs(actionInputs, repository.getInputFileCache(), execRoot)
.onlyUseDigests(missingDigests)
.build());
@@ -152,21 +190,11 @@ public class GrpcActionCache implements RemoteActionCache {
* Download the entire tree data rooted by the given digest and write it into the given location.
*/
@Override
- public void downloadTree(ContentDigest rootDigest, Path rootLocation)
+ public void downloadTree(Digest rootDigest, Path rootLocation)
throws IOException, CacheNotFoundException {
throw new UnsupportedOperationException();
}
- private void handleDownloadStatus(CasStatus status) throws CacheNotFoundException {
- if (!status.getSucceeded()) {
- if (status.getError() == CasStatus.ErrorCode.MISSING_DIGEST) {
- throw new CacheNotFoundException(status.getMissingDigest(0));
- }
- // TODO(olaola): deal with other statuses better.
- throw new RuntimeException(status.getErrorDetail());
- }
- }
-
/**
* Download all results of a remotely executed action locally. TODO(olaola): will need to amend to
* include the {@link com.google.devtools.build.lib.remote.TreeNodeRepository} for updating.
@@ -174,106 +202,85 @@ public class GrpcActionCache implements RemoteActionCache {
@Override
public void downloadAllResults(ActionResult result, Path execRoot)
throws IOException, CacheNotFoundException {
- if (result.getOutputList().isEmpty()) {
+ if (result.getOutputFilesList().isEmpty() && result.getOutputDirectoriesList().isEmpty()) {
return;
}
- // Send all the file requests in a single synchronous batch.
- // TODO(olaola): profile to maybe replace with separate concurrent requests.
- CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder();
- Map<ContentDigest, Pair<Path, FileMetadata>> metadataMap = new HashMap<>();
- for (Output output : result.getOutputList()) {
- Path path = execRoot.getRelative(output.getPath());
- if (output.getContentCase() == ContentCase.FILE_METADATA) {
- FileMetadata fileMetadata = output.getFileMetadata();
- ContentDigest digest = fileMetadata.getDigest();
- if (digest.getSizeBytes() > 0) {
- request.addDigest(digest);
- metadataMap.put(digest, Pair.of(path, fileMetadata));
- } else {
- // Handle empty file locally.
- FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
- FileSystemUtils.writeContent(path, new byte[0]);
- }
+ for (OutputFile file : result.getOutputFilesList()) {
+ Path path = execRoot.getRelative(file.getPath());
+ FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
+ Digest digest = file.getDigest();
+ if (digest.getSizeBytes() == 0) {
+ // Handle empty file locally.
+ FileSystemUtils.writeContent(path, new byte[0]);
} else {
- downloadTree(output.getDigest(), path);
+ try (OutputStream stream = path.getOutputStream()) {
+ if (!file.getContent().isEmpty()) {
+ file.getContent().writeTo(stream);
+ } else {
+ Iterator<ReadResponse> replies = readBlob(digest);
+ while (replies.hasNext()) {
+ replies.next().getData().writeTo(stream);
+ }
+ }
+ }
}
+ path.setExecutable(file.getIsExecutable());
}
- Iterator<CasDownloadReply> replies = casIface.downloadBlob(request.build());
- Set<ContentDigest> results = new HashSet<>();
- while (replies.hasNext()) {
- results.add(createFileFromStream(metadataMap, replies));
- }
- for (ContentDigest digest : metadataMap.keySet()) {
- if (!results.contains(digest)) {
- throw new CacheNotFoundException(digest);
- }
+ for (OutputDirectory directory : result.getOutputDirectoriesList()) {
+ downloadTree(directory.getDigest(), execRoot.getRelative(directory.getPath()));
}
}
- private ContentDigest createFileFromStream(
- Map<ContentDigest, Pair<Path, FileMetadata>> metadataMap, Iterator<CasDownloadReply> replies)
- throws IOException, CacheNotFoundException {
- Preconditions.checkArgument(replies.hasNext());
- CasDownloadReply reply = replies.next();
- if (reply.hasStatus()) {
- handleDownloadStatus(reply.getStatus());
+ private Iterator<ReadResponse> readBlob(Digest digest) throws CacheNotFoundException {
+ String resourceName = "";
+ if (!options.remoteInstanceName.isEmpty()) {
+ resourceName += options.remoteInstanceName + "/";
}
- BlobChunk chunk = reply.getData();
- ContentDigest digest = chunk.getDigest();
- Preconditions.checkArgument(metadataMap.containsKey(digest));
- Pair<Path, FileMetadata> metadata = metadataMap.get(digest);
- Path path = metadata.first;
- FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
- try (OutputStream stream = path.getOutputStream()) {
- ByteString data = chunk.getData();
- data.writeTo(stream);
- long bytesLeft = digest.getSizeBytes() - data.size();
- while (bytesLeft > 0) {
- Preconditions.checkArgument(replies.hasNext());
- reply = replies.next();
- if (reply.hasStatus()) {
- handleDownloadStatus(reply.getStatus());
- }
- chunk = reply.getData();
- data = chunk.getData();
- Preconditions.checkArgument(!chunk.hasDigest());
- Preconditions.checkArgument(chunk.getOffset() == digest.getSizeBytes() - bytesLeft);
- data.writeTo(stream);
- bytesLeft -= data.size();
+ resourceName += "blobs/" + digest.getHash() + "/" + digest.getSizeBytes();
+ try {
+ return bsBlockingStub
+ .get()
+ .read(ReadRequest.newBuilder().setResourceName(resourceName).build());
+ } catch (StatusRuntimeException e) {
+ if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
+ throw new CacheNotFoundException(digest);
}
- path.setExecutable(metadata.second.getExecutable());
+ throw e;
}
- return digest;
}
/** Upload all results of a locally executed action to the cache. */
@Override
public void uploadAllResults(Path execRoot, Collection<Path> files, ActionResult.Builder result)
throws IOException, InterruptedException {
- ArrayList<ContentDigest> digests = new ArrayList<>();
- Chunker.Builder b = new Chunker.Builder().chunkSize(options.grpcMaxChunkSizeBytes);
+ ArrayList<Digest> digests = new ArrayList<>();
+ Chunker.Builder b = new Chunker.Builder();
for (Path file : files) {
- digests.add(ContentDigests.computeDigest(file));
+ if (!file.exists()) {
+ // We ignore requested results that have not been generated by the action.
+ continue;
+ }
+ if (file.isDirectory()) {
+ // TODO(olaola): to implement this for a directory, will need to create or pass a
+ // TreeNodeRepository to call uploadTree.
+ throw new UnsupportedOperationException("Storing a directory is not yet supported.");
+ }
+ digests.add(Digests.computeDigest(file));
b.addInput(file);
}
- ImmutableSet<ContentDigest> missing = getMissingDigests(digests);
+ ImmutableSet<Digest> missing = getMissingDigests(digests);
if (!missing.isEmpty()) {
uploadChunks(missing.size(), b.onlyUseDigests(missing).build());
}
int index = 0;
for (Path file : files) {
- if (file.isDirectory()) {
- // TODO(olaola): to implement this for a directory, will need to create or pass a
- // TreeNodeRepository to call uploadTree.
- throw new UnsupportedOperationException("Storing a directory is not yet supported.");
- }
// Add to protobuf.
+ // TODO(olaola): inline small results here.
result
- .addOutputBuilder()
+ .addOutputFilesBuilder()
.setPath(file.relativeTo(execRoot).getPathString())
- .getFileMetadataBuilder()
.setDigest(digests.get(index++))
- .setExecutable(file.isExecutable());
+ .setIsExecutable(file.isExecutable());
}
}
@@ -284,11 +291,11 @@ public class GrpcActionCache implements RemoteActionCache {
* @return The key for fetching the file contents blob from cache.
*/
@Override
- public ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException {
- ContentDigest digest = ContentDigests.computeDigest(file);
- ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest));
+ public Digest uploadFileContents(Path file) throws IOException, InterruptedException {
+ Digest digest = Digests.computeDigest(file);
+ ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
- uploadChunks(1, Chunker.from(file, options.grpcMaxChunkSizeBytes));
+ uploadChunks(1, Chunker.from(file));
}
return digest;
}
@@ -300,97 +307,84 @@ public class GrpcActionCache implements RemoteActionCache {
* @return The key for fetching the file contents blob from cache.
*/
@Override
- public ContentDigest uploadFileContents(
+ public Digest uploadFileContents(
ActionInput input, Path execRoot, ActionInputFileCache inputCache)
throws IOException, InterruptedException {
- ContentDigest digest = ContentDigests.getDigestFromInputCache(input, inputCache);
- ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest));
+ Digest digest = Digests.getDigestFromInputCache(input, inputCache);
+ ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
- uploadChunks(1, Chunker.from(input, options.grpcMaxChunkSizeBytes, inputCache, execRoot));
+ uploadChunks(1, Chunker.from(input, inputCache, execRoot));
}
return digest;
}
- static class UploadBlobReplyStreamObserver implements StreamObserver<CasUploadBlobReply> {
- private final CountDownLatch finishLatch;
- private final AtomicReference<RuntimeException> exception;
-
- public UploadBlobReplyStreamObserver(
- CountDownLatch finishLatch, AtomicReference<RuntimeException> exception) {
- this.finishLatch = finishLatch;
- this.exception = exception;
- }
-
- @Override
- public void onNext(CasUploadBlobReply reply) {
- if (!reply.getStatus().getSucceeded()) {
- // TODO(olaola): add basic retry logic on transient errors!
- this.exception.compareAndSet(
- null, new RuntimeException(reply.getStatus().getErrorDetail()));
+ private void uploadChunks(int numItems, Chunker chunker)
+ throws InterruptedException, IOException {
+ final CountDownLatch finishLatch = new CountDownLatch(numItems);
+ final AtomicReference<RuntimeException> exception = new AtomicReference<>(null);
+ StreamObserver<WriteRequest> requestObserver = null;
+ String resourceName = "";
+ if (!options.remoteInstanceName.isEmpty()) {
+ resourceName += options.remoteInstanceName + "/";
+ }
+ while (chunker.hasNext()) {
+ Chunker.Chunk chunk = chunker.next();
+ final Digest digest = chunk.getDigest();
+ long offset = chunk.getOffset();
+ WriteRequest.Builder request = WriteRequest.newBuilder();
+ if (offset == 0) { // Beginning of new upload.
+ numItems--;
+ request.setResourceName(
+ resourceName
+ + "uploads/"
+ + UUID.randomUUID()
+ + "/blobs/"
+ + digest.getHash()
+ + "/"
+ + digest.getSizeBytes());
+ // The batches execute simultaneously.
+ requestObserver =
+ bsStub
+ .get()
+ .write(
+ new StreamObserver<WriteResponse>() {
+ private long bytesLeft = digest.getSizeBytes();
+
+ @Override
+ public void onNext(WriteResponse reply) {
+ bytesLeft -= reply.getCommittedSize();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ exception.compareAndSet(
+ null, new StatusRuntimeException(Status.fromThrowable(t)));
+ finishLatch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ if (bytesLeft != 0) {
+ exception.compareAndSet(
+ null, new RuntimeException("Server did not commit all data."));
+ }
+ finishLatch.countDown();
+ }
+ });
}
- }
-
- @Override
- public void onError(Throwable t) {
- this.exception.compareAndSet(null, new StatusRuntimeException(Status.fromThrowable(t)));
- finishLatch.countDown();
- }
-
- @Override
- public void onCompleted() {
- finishLatch.countDown();
- }
- }
-
- private void uploadChunks(int numItems, Chunker blobs) throws InterruptedException, IOException {
- CountDownLatch finishLatch = new CountDownLatch(numItems); // Maximal number of batches.
- AtomicReference<RuntimeException> exception = new AtomicReference<>(null);
- UploadBlobReplyStreamObserver responseObserver = null;
- StreamObserver<CasUploadBlobRequest> requestObserver = null;
- int currentBatchBytes = 0;
- int batchedInputs = 0;
- int batches = 0;
- try {
- while (blobs.hasNext()) {
- BlobChunk chunk = blobs.next();
- if (chunk.hasDigest()) {
- // Determine whether to start next batch.
- final long batchSize = chunk.getDigest().getSizeBytes() + currentBatchBytes;
- if (batchedInputs % options.grpcMaxBatchInputs == 0
- || batchSize > options.grpcMaxBatchSizeBytes) {
- // The batches execute simultaneously.
- if (requestObserver != null) {
- batchedInputs = 0;
- currentBatchBytes = 0;
- requestObserver.onCompleted();
- }
- batches++;
- responseObserver = new UploadBlobReplyStreamObserver(finishLatch, exception);
- requestObserver = casIface.uploadBlobAsync(responseObserver);
- }
- batchedInputs++;
- }
- currentBatchBytes += chunk.getData().size();
- requestObserver.onNext(CasUploadBlobRequest.newBuilder().setData(chunk).build());
- if (finishLatch.getCount() == 0) {
- // RPC completed or errored before we finished sending.
- throw new RuntimeException(
- "gRPC terminated prematurely: "
- + (exception.get() != null ? exception.get() : "unknown cause"));
- }
+ byte[] data = chunk.getData();
+ boolean finishWrite = offset + data.length == digest.getSizeBytes();
+ request.setData(ByteString.copyFrom(data)).setWriteOffset(offset).setFinishWrite(finishWrite);
+ requestObserver.onNext(request.build());
+ if (finishWrite) {
+ requestObserver.onCompleted();
}
- } catch (RuntimeException e) {
- // Cancel RPC
- if (requestObserver != null) {
- requestObserver.onError(e);
+ if (finishLatch.getCount() <= numItems) {
+ // Current RPC errored before we finished sending.
+ if (!finishWrite) {
+ chunker.advanceInput();
+ }
}
- throw e;
- }
- if (requestObserver != null) {
- requestObserver.onCompleted(); // Finish last batch.
- }
- while (batches++ < numItems) {
- finishLatch.countDown(); // Non-sent batches.
}
finishLatch.await(options.remoteTimeout, TimeUnit.SECONDS);
if (exception.get() != null) {
@@ -399,33 +393,12 @@ public class GrpcActionCache implements RemoteActionCache {
}
@Override
- public ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs)
- throws InterruptedException {
- ArrayList<ContentDigest> digests = new ArrayList<>();
- Chunker.Builder b = new Chunker.Builder().chunkSize(options.grpcMaxChunkSizeBytes);
- for (byte[] blob : blobs) {
- digests.add(ContentDigests.computeDigest(blob));
- b.addInput(blob);
- }
- ImmutableSet<ContentDigest> missing = getMissingDigests(digests);
+ public Digest uploadBlob(byte[] blob) throws InterruptedException {
+ Digest digest = Digests.computeDigest(blob);
+ ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
try {
if (!missing.isEmpty()) {
- uploadChunks(missing.size(), b.onlyUseDigests(missing).build());
- }
- return ImmutableList.copyOf(digests);
- } catch (IOException e) {
- // This will never happen.
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public ContentDigest uploadBlob(byte[] blob) throws InterruptedException {
- ContentDigest digest = ContentDigests.computeDigest(blob);
- ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest));
- try {
- if (!missing.isEmpty()) {
- uploadChunks(1, Chunker.from(blob, options.grpcMaxChunkSizeBytes));
+ uploadChunks(1, Chunker.from(blob));
}
return digest;
} catch (IOException e) {
@@ -435,69 +408,20 @@ public class GrpcActionCache implements RemoteActionCache {
}
@Override
- public byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException {
- return downloadBlobs(ImmutableList.of(digest)).get(0);
- }
-
- @Override
- public ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests)
- throws CacheNotFoundException {
- // Send all the file requests in a single synchronous batch.
- // TODO(olaola): profile to maybe replace with separate concurrent requests.
- CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder();
- for (ContentDigest digest : digests) {
- if (digest.getSizeBytes() > 0) {
- request.addDigest(digest); // We handle empty blobs locally.
- }
+ public byte[] downloadBlob(Digest digest) throws CacheNotFoundException {
+ if (digest.getSizeBytes() == 0) {
+ return new byte[0];
}
- Iterator<CasDownloadReply> replies = null;
- Map<ContentDigest, byte[]> results = new HashMap<>();
- int digestCount = request.getDigestCount();
- if (digestCount > 0) {
- replies = casIface.downloadBlob(request.build());
- while (digestCount-- > 0) {
- Preconditions.checkArgument(replies.hasNext());
- CasDownloadReply reply = replies.next();
- if (reply.hasStatus()) {
- handleDownloadStatus(reply.getStatus());
- }
- BlobChunk chunk = reply.getData();
- ContentDigest digest = chunk.getDigest();
- // This is not enough, but better than nothing.
- Preconditions.checkArgument(digest.getSizeBytes() / 1000.0 < MAX_MEMORY_KBYTES);
- byte[] result = new byte[(int) digest.getSizeBytes()];
- ByteString data = chunk.getData();
- data.copyTo(result, 0);
- int offset = data.size();
- while (offset < result.length) {
- Preconditions.checkArgument(replies.hasNext());
- reply = replies.next();
- if (reply.hasStatus()) {
- handleDownloadStatus(reply.getStatus());
- }
- chunk = reply.getData();
- Preconditions.checkArgument(!chunk.hasDigest());
- Preconditions.checkArgument(chunk.getOffset() == offset);
- data = chunk.getData();
- data.copyTo(result, offset);
- offset += data.size();
- }
- results.put(digest, result);
- }
- }
-
- ArrayList<byte[]> result = new ArrayList<>();
- for (ContentDigest digest : digests) {
- if (digest.getSizeBytes() == 0) {
- result.add(new byte[0]);
- continue;
- }
- if (!results.containsKey(digest)) {
- throw new CacheNotFoundException(digest);
- }
- result.add(results.get(digest));
+ Iterator<ReadResponse> replies = readBlob(digest);
+ byte[] result = new byte[(int) digest.getSizeBytes()];
+ int offset = 0;
+ while (replies.hasNext()) {
+ ByteString data = replies.next().getData();
+ data.copyTo(result, offset);
+ offset += data.size();
}
- return ImmutableList.copyOf(result);
+ Preconditions.checkState(digest.getSizeBytes() == offset);
+ return result;
}
// Execution Cache API
@@ -505,30 +429,39 @@ public class GrpcActionCache implements RemoteActionCache {
/** Returns a cached result for a given Action digest, or null if not found in cache. */
@Override
public ActionResult getCachedActionResult(ActionKey actionKey) {
- ExecutionCacheRequest request =
- ExecutionCacheRequest.newBuilder().setActionDigest(actionKey.getDigest()).build();
- ExecutionCacheReply reply = iface.getCachedResult(request);
- ExecutionCacheStatus status = reply.getStatus();
- if (!status.getSucceeded()
- && status.getError() != ExecutionCacheStatus.ErrorCode.MISSING_RESULT) {
- throw new RuntimeException(status.getErrorDetail());
+ try {
+ return acBlockingStub
+ .get()
+ .getActionResult(
+ GetActionResultRequest.newBuilder()
+ .setInstanceName(options.remoteInstanceName)
+ .setActionDigest(actionKey.getDigest())
+ .build());
+ } catch (StatusRuntimeException e) {
+ if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
+ return null;
+ }
+ throw e;
}
- return reply.hasResult() ? reply.getResult() : null;
}
/** Sets the given result as result of the given Action. */
@Override
public void setCachedActionResult(ActionKey actionKey, ActionResult result)
throws InterruptedException {
- ExecutionCacheSetRequest request =
- ExecutionCacheSetRequest.newBuilder()
- .setActionDigest(actionKey.getDigest())
- .setResult(result)
- .build();
- ExecutionCacheSetReply reply = iface.setCachedResult(request);
- ExecutionCacheStatus status = reply.getStatus();
- if (!status.getSucceeded() && status.getError() != ExecutionCacheStatus.ErrorCode.UNSUPPORTED) {
- throw new RuntimeException(status.getErrorDetail());
+ try {
+ acBlockingStub
+ .get()
+ .updateActionResult(
+ UpdateActionResultRequest.newBuilder()
+ .setInstanceName(options.remoteInstanceName)
+ .setActionDigest(actionKey.getDigest())
+ .setActionResult(result)
+ .build());
+ } catch (StatusRuntimeException e) {
+ if (e.getStatus().getCode() != Status.Code.UNIMPLEMENTED) {
+ throw e;
+ }
}
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java
deleted file mode 100644
index 529ff9c3db..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java
+++ /dev/null
@@ -1,43 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote;
-
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub;
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
-import io.grpc.stub.StreamObserver;
-import java.util.Iterator;
-
-/**
- * An abstraction layer between the remote execution client and gRPC to support unit testing. This
- * interface covers the CAS RPC methods, see {@link CasServiceBlockingStub} and
- * {@link CasServiceStub}.
- */
-public interface GrpcCasInterface {
- CasLookupReply lookup(CasLookupRequest request);
- CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request);
- CasDownloadTreeMetadataReply downloadTreeMetadata(CasDownloadTreeMetadataRequest request);
- Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request);
- StreamObserver<CasUploadBlobRequest> uploadBlobAsync(
- StreamObserver<CasUploadBlobReply> responseObserver);
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java
deleted file mode 100644
index 375c2ab359..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote;
-
-import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
-
-/**
- * An abstraction layer between the remote execution client and gRPC to support unit testing. This
- * interface covers the execution cache RPC methods, see {@link ExecutionCacheServiceBlockingStub}.
- */
-public interface GrpcExecutionCacheInterface {
- ExecutionCacheReply getCachedResult(ExecutionCacheRequest request);
- ExecutionCacheSetReply setCachedResult(ExecutionCacheSetRequest request);
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java
deleted file mode 100644
index fd44792b4a..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote;
-
-import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import java.util.Iterator;
-
-/**
- * An abstraction layer between the remote execution client and gRPC to support unit testing. This
- * interface covers the remote execution RPC methods, see {@link ExecuteServiceBlockingStub}.
- */
-public interface GrpcExecutionInterface {
- Iterator<ExecuteReply> execute(ExecuteRequest request);
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java
deleted file mode 100644
index 6e100ae1d0..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java
+++ /dev/null
@@ -1,131 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote;
-
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub;
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub;
-import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub;
-import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
-import io.grpc.Channel;
-import io.grpc.stub.StreamObserver;
-import java.util.Iterator;
-import java.util.concurrent.TimeUnit;
-
-/** Implementations of the gRPC interfaces that actually talk to gRPC. */
-public class GrpcInterfaces {
- /** Create a {@link GrpcCasInterface} instance that actually talks to gRPC. */
- public static GrpcCasInterface casInterface(
- final int grpcTimeoutSeconds,
- final Channel channel,
- final ChannelOptions channelOptions) {
- return new GrpcCasInterface() {
- private CasServiceBlockingStub getCasServiceBlockingStub() {
- return CasServiceGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
- }
-
- private CasServiceStub getCasServiceStub() {
- return CasServiceGrpc.newStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
- }
-
- @Override
- public CasLookupReply lookup(CasLookupRequest request) {
- return getCasServiceBlockingStub().lookup(request);
- }
-
- @Override
- public CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request) {
- return getCasServiceBlockingStub().uploadTreeMetadata(request);
- }
-
- @Override
- public CasDownloadTreeMetadataReply downloadTreeMetadata(
- CasDownloadTreeMetadataRequest request) {
- return getCasServiceBlockingStub().downloadTreeMetadata(request);
- }
-
- @Override
- public Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request) {
- return getCasServiceBlockingStub().downloadBlob(request);
- }
-
- @Override
- public StreamObserver<CasUploadBlobRequest> uploadBlobAsync(
- StreamObserver<CasUploadBlobReply> responseObserver) {
- return getCasServiceStub().uploadBlob(responseObserver);
- }
- };
- }
-
- /** Create a {@link GrpcCasInterface} instance that actually talks to gRPC. */
- public static GrpcExecutionCacheInterface executionCacheInterface(
- final int grpcTimeoutSeconds,
- final Channel channel,
- final ChannelOptions channelOptions) {
- return new GrpcExecutionCacheInterface() {
- private ExecutionCacheServiceBlockingStub getExecutionCacheServiceBlockingStub() {
- return ExecutionCacheServiceGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
- }
-
- @Override
- public ExecutionCacheReply getCachedResult(ExecutionCacheRequest request) {
- return getExecutionCacheServiceBlockingStub().getCachedResult(request);
- }
-
- @Override
- public ExecutionCacheSetReply setCachedResult(ExecutionCacheSetRequest request) {
- return getExecutionCacheServiceBlockingStub().setCachedResult(request);
- }
- };
- }
-
- /** Create a {@link GrpcExecutionInterface} instance that actually talks to gRPC. */
- public static GrpcExecutionInterface executionInterface(
- final int grpcTimeoutSeconds,
- final Channel channel,
- final ChannelOptions channelOptions) {
- return new GrpcExecutionInterface() {
- @Override
- public Iterator<ExecuteReply> execute(ExecuteRequest request) {
- ExecuteServiceBlockingStub stub =
- ExecuteServiceGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(
- grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS);
- return stub.execute(request);
- }
- };
- }
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
index eb79c91919..1950a724c3 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
@@ -15,58 +15,54 @@
package com.google.devtools.build.lib.remote;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
-import io.grpc.ManagedChannel;
-import java.util.Iterator;
+import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
+import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
+import com.google.devtools.remoteexecution.v1test.ExecutionGrpc;
+import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionBlockingStub;
+import com.google.longrunning.Operation;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.util.Durations;
+import io.grpc.Channel;
+import io.grpc.protobuf.StatusProto;
+import java.util.concurrent.TimeUnit;
/** A remote work executor that uses gRPC for communicating the work, inputs and outputs. */
@ThreadSafe
-public class GrpcRemoteExecutor extends GrpcActionCache {
+public class GrpcRemoteExecutor {
+ private final RemoteOptions options;
+ private final ChannelOptions channelOptions;
+ private final Channel channel;
+
public static boolean isRemoteExecutionOptions(RemoteOptions options) {
return options.remoteExecutor != null;
}
- private final GrpcExecutionInterface executionIface;
-
- public GrpcRemoteExecutor(
- RemoteOptions options,
- GrpcCasInterface casIface,
- GrpcExecutionCacheInterface cacheIface,
- GrpcExecutionInterface executionIface) {
- super(options, casIface, cacheIface);
- this.executionIface = executionIface;
- }
-
- public GrpcRemoteExecutor(
- ManagedChannel channel, ChannelOptions channelOptions, RemoteOptions options) {
- super(
- options,
- GrpcInterfaces.casInterface(options.remoteTimeout, channel, channelOptions),
- GrpcInterfaces.executionCacheInterface(
- options.remoteTimeout, channel, channelOptions));
- this.executionIface =
- GrpcInterfaces.executionInterface(options.remoteTimeout, channel, channelOptions);
+ public GrpcRemoteExecutor(Channel channel, ChannelOptions channelOptions, RemoteOptions options) {
+ this.options = options;
+ this.channelOptions = channelOptions;
+ this.channel = channel;
}
- public ExecuteReply executeRemotely(ExecuteRequest request) {
- Iterator<ExecuteReply> replies = executionIface.execute(request);
- ExecuteReply reply = null;
- while (replies.hasNext()) {
- reply = replies.next();
- // We can handle the action execution progress here.
+ public ExecuteResponse executeRemotely(ExecuteRequest request) {
+ // TODO(olaola): handle longrunning Operations by using the Watcher API to wait for results.
+ // For now, only support actions with wait_for_completion = true.
+ Preconditions.checkArgument(request.getWaitForCompletion());
+ int actionSeconds = (int) Durations.toSeconds(request.getAction().getTimeout());
+ ExecutionBlockingStub stub =
+ ExecutionGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout + actionSeconds, TimeUnit.SECONDS);
+ Operation op = stub.execute(request);
+ Preconditions.checkState(op.getDone());
+ Preconditions.checkState(op.getResultCase() != Operation.ResultCase.RESULT_NOT_SET);
+ if (op.getResultCase() == Operation.ResultCase.ERROR) {
+ throw StatusProto.toStatusRuntimeException(op.getError());
}
- if (reply == null) {
- return ExecuteReply.newBuilder()
- .setStatus(
- ExecutionStatus.newBuilder()
- .setExecuted(false)
- .setSucceeded(false)
- .setError(ExecutionStatus.ErrorCode.UNKNOWN_ERROR)
- .setErrorDetail("Remote server terminated the connection"))
- .build();
+ try {
+ return op.getResponse().unpack(ExecuteResponse.class);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
}
- return reply;
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/README.md b/src/main/java/com/google/devtools/build/lib/remote/README.md
index 4c85707e71..2525240b93 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/README.md
+++ b/src/main/java/com/google/devtools/build/lib/remote/README.md
@@ -13,7 +13,7 @@ When it's configured to use a remote cache, Bazel computes a hash for each actio
Starting at version 0.5.0, Bazel supports two caching protocols:
1. a HTTP-based REST protocol
-2. [a gRPC-based protocol](https://github.com/bazelbuild/bazel/blob/master/src/main/protobuf/remote_protocol.proto)
+2. [a gRPC-based protocol](https://github.com/googleapis/googleapis/blob/master/google/devtools/remoteexecution/v1test/remote_execution.proto)
## Remote caching using the HTTP REST protocol
@@ -101,7 +101,7 @@ this directory to include security control.
## Remote caching using the gRPC protocol
-We're working on a [gRPC protocol](https://github.com/bazelbuild/bazel/blob/master/src/main/protobuf/remote_protocol.proto)
+We're working on a [gRPC protocol](https://github.com/googleapis/googleapis/blob/master/google/devtools/remoteexecution/v1test/remote_execution.proto)
that supports both remote caching and remote execution. As of this writing, there is only a single server-side implementation, which is not intended for production use.
### Bazel Setup
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
index 44a63c4d40..84fdb29430 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
@@ -14,15 +14,14 @@
package com.google.devtools.build.lib.remote;
-import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Digest;
import java.io.IOException;
import java.util.Collection;
import javax.annotation.Nullable;
@@ -32,8 +31,8 @@ import javax.annotation.Nullable;
interface RemoteActionCache {
// CAS API
- // TODO(olaola): create a unified set of exceptions raised by the cache to encapsulate the
- // underlying CasStatus messages and gRPC errors errors.
+ // TODO(buchgr): consider removing the CacheNotFoundException, and replacing it with other
+ // ways to signal a cache miss.
/**
* Upload enough of the tree metadata and data into remote cache so that the entire tree can be
@@ -45,7 +44,7 @@ interface RemoteActionCache {
/**
* Download the entire tree data rooted by the given digest and write it into the given location.
*/
- void downloadTree(ContentDigest rootDigest, Path rootLocation)
+ void downloadTree(Digest rootDigest, Path rootLocation)
throws IOException, CacheNotFoundException;
/**
@@ -68,7 +67,7 @@ interface RemoteActionCache {
*
* @return The key for fetching the file contents blob from cache.
*/
- ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException;
+ Digest uploadFileContents(Path file) throws IOException, InterruptedException;
/**
* Put the input file contents in cache if it is not already in it. No-op if the data is already
@@ -76,22 +75,14 @@ interface RemoteActionCache {
*
* @return The key for fetching the file contents blob from cache.
*/
- ContentDigest uploadFileContents(
- ActionInput input, Path execRoot, ActionInputFileCache inputCache)
+ Digest uploadFileContents(ActionInput input, Path execRoot, ActionInputFileCache inputCache)
throws IOException, InterruptedException;
- /** Upload the given blobs to the cache, and return their digests. */
- ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs) throws InterruptedException;
-
/** Upload the given blob to the cache, and return its digests. */
- ContentDigest uploadBlob(byte[] blob) throws InterruptedException;
+ Digest uploadBlob(byte[] blob) throws InterruptedException;
/** Download and return a blob with a given digest from the cache. */
- byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException;
-
- /** Download and return blobs with given digests from the cache. */
- ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests)
- throws CacheNotFoundException;
+ byte[] downloadBlob(Digest digest) throws CacheNotFoundException;
// Execution Cache API
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
index fcdb44a14a..d779aa7073 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
@@ -72,30 +72,6 @@ public final class RemoteOptions extends OptionsBase {
public String remoteCache;
@Option(
- name = "grpc_max_chunk_size_bytes",
- defaultValue = "16000",
- category = "remote",
- help = "The maximal number of data bytes to be sent in a single message."
- )
- public int grpcMaxChunkSizeBytes;
-
- @Option(
- name = "grpc_max_batch_inputs",
- defaultValue = "100",
- category = "remote",
- help = "The maximal number of input files to be sent in a single batch."
- )
- public int grpcMaxBatchInputs;
-
- @Option(
- name = "grpc_max_batch_size_bytes",
- defaultValue = "10485760", // 10MB
- category = "remote",
- help = "The maximal number of input bytes to be sent in a single batch."
- )
- public int grpcMaxBatchSizeBytes;
-
- @Option(
name = "remote_timeout",
defaultValue = "60",
category = "remote",
@@ -134,4 +110,12 @@ public final class RemoteOptions extends OptionsBase {
help = "Temporary, for testing only. Manually set a Platform to pass to remote execution."
)
public String experimentalRemotePlatformOverride;
+
+ @Option(
+ name = "remote_instance_name",
+ defaultValue = "",
+ category = "remote",
+ help = "Value to pass as instance_name in the remote execution API."
+ )
+ public String remoteInstanceName;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index bbb0166e4e..8807d0652b 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -14,32 +14,28 @@
package com.google.devtools.build.lib.remote;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.Spawn;
-import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
+import com.google.devtools.build.lib.actions.Spawns;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.exec.SpawnResult;
import com.google.devtools.build.lib.exec.SpawnResult.Status;
import com.google.devtools.build.lib.exec.SpawnRunner;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Command;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Platform;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.remoteexecution.v1test.Action;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Command;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
+import com.google.devtools.remoteexecution.v1test.Platform;
+import com.google.protobuf.Duration;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
import io.grpc.StatusRuntimeException;
@@ -49,9 +45,7 @@ import java.util.List;
import java.util.SortedMap;
import java.util.TreeSet;
-/**
- * A client for the remote execution service.
- */
+/** A client for the remote execution service. */
@ThreadSafe
final class RemoteSpawnRunner implements SpawnRunner {
private final Path execRoot;
@@ -60,11 +54,13 @@ final class RemoteSpawnRunner implements SpawnRunner {
private final Platform platform;
private final GrpcRemoteExecutor executor;
+ private final GrpcActionCache remoteCache;
RemoteSpawnRunner(
Path execRoot,
RemoteOptions options,
- GrpcRemoteExecutor executor) {
+ GrpcRemoteExecutor executor,
+ GrpcActionCache remoteCache) {
this.execRoot = execRoot;
this.options = options;
if (options.experimentalRemotePlatformOverride != null) {
@@ -79,27 +75,12 @@ final class RemoteSpawnRunner implements SpawnRunner {
platform = null;
}
this.executor = executor;
- }
-
- RemoteSpawnRunner(
- Path execRoot,
- RemoteOptions options,
- AuthAndTLSOptions authTlsOptions) {
- this(execRoot, options, connect(options, authTlsOptions));
- }
-
- private static GrpcRemoteExecutor connect(RemoteOptions options,
- AuthAndTLSOptions authTlsOptions) {
- Preconditions.checkArgument(GrpcRemoteExecutor.isRemoteExecutionOptions(options));
- ChannelOptions channelOptions = ChannelOptions.create(authTlsOptions,
- options.grpcMaxChunkSizeBytes);
- return new GrpcRemoteExecutor(
- RemoteUtils.createChannel(options.remoteExecutor, channelOptions), channelOptions, options);
+ this.remoteCache = remoteCache;
}
@Override
public SpawnResult exec(Spawn spawn, SpawnExecutionPolicy policy)
- throws InterruptedException, IOException {
+ throws ExecException, InterruptedException, IOException {
ActionExecutionMetadata owner = spawn.getResourceOwner();
if (owner.getOwner() != null) {
policy.report(ProgressStatus.EXECUTING);
@@ -116,46 +97,38 @@ final class RemoteSpawnRunner implements SpawnRunner {
Action action =
buildAction(
spawn.getOutputFiles(),
- ContentDigests.computeDigest(command),
- repository.getMerkleDigest(inputRoot));
+ Digests.computeDigest(command),
+ repository.getMerkleDigest(inputRoot),
+ // TODO(olaola): set sensible local and remote timouts.
+ Spawns.getTimeoutSeconds(spawn, 120));
- ActionKey actionKey = ContentDigests.computeActionKey(action);
+ ActionKey actionKey = Digests.computeActionKey(action);
ActionResult result =
- this.options.remoteAcceptCached ? executor.getCachedActionResult(actionKey) : null;
+ options.remoteAcceptCached ? remoteCache.getCachedActionResult(actionKey) : null;
if (result == null) {
// Cache miss or we don't accept cache hits.
// Upload the command and all the inputs into the remote cache.
- executor.uploadBlob(command.toByteArray());
- // TODO(olaola): this should use the ActionInputFileCache for SHA1 digests!
- executor.uploadTree(repository, execRoot, inputRoot);
+ remoteCache.uploadBlob(command.toByteArray());
+ remoteCache.uploadTree(repository, execRoot, inputRoot);
// TODO(olaola): set BuildInfo and input total bytes as well.
ExecuteRequest.Builder request =
ExecuteRequest.newBuilder()
+ .setInstanceName(options.remoteInstanceName)
.setAction(action)
- .setAcceptCached(this.options.remoteAcceptCached)
+ .setWaitForCompletion(true)
.setTotalInputFileCount(inputMap.size())
- .setTimeoutMillis(policy.getTimeoutMillis());
- ExecuteReply reply = executor.executeRemotely(request.build());
- ExecutionStatus status = reply.getStatus();
-
- if (!status.getSucceeded()
- && (status.getError() != ExecutionStatus.ErrorCode.EXEC_FAILED)) {
- return new SpawnResult.Builder()
- // TODO(ulfjack): Improve the translation of the error status.
- .setStatus(Status.EXECUTION_FAILED)
- .setExitCode(-1)
- .build();
- }
-
- result = reply.getResult();
+ .setSkipCacheLookup(!options.remoteAcceptCached);
+ result = executor.executeRemotely(request.build()).getResult();
}
// TODO(ulfjack): Download stdout, stderr, and the output files in a single call.
- passRemoteOutErr(executor, result, policy.getFileOutErr());
- executor.downloadAllResults(result, execRoot);
+ passRemoteOutErr(remoteCache, result, policy.getFileOutErr());
+ if (result.getExitCode() == 0) {
+ remoteCache.downloadAllResults(result, execRoot);
+ }
return new SpawnResult.Builder()
- .setStatus(Status.SUCCESS)
- .setExitCode(result.getReturnCode())
+ .setStatus(Status.SUCCESS) // Even if the action failed with non-zero exit code.
+ .setExitCode(result.getExitCode())
.build();
} catch (StatusRuntimeException | CacheNotFoundException e) {
throw new IOException(e);
@@ -163,37 +136,58 @@ final class RemoteSpawnRunner implements SpawnRunner {
}
private Action buildAction(
- Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) {
+ Collection<? extends ActionInput> outputs,
+ Digest command,
+ Digest inputRoot,
+ long timeoutSeconds) {
Action.Builder action = Action.newBuilder();
action.setCommandDigest(command);
action.setInputRootDigest(inputRoot);
// Somewhat ugly: we rely on the stable order of outputs here for remote action caching.
for (ActionInput output : outputs) {
- action.addOutputPath(output.getExecPathString());
+ // TODO: output directories should be handled here, when they are supported.
+ action.addOutputFiles(output.getExecPathString());
}
if (platform != null) {
action.setPlatform(platform);
}
+ action.setTimeout(Duration.newBuilder().setSeconds(timeoutSeconds));
return action.build();
}
private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) {
Command.Builder command = Command.newBuilder();
- command.addAllArgv(arguments);
+ command.addAllArguments(arguments);
// Sorting the environment pairs by variable name.
TreeSet<String> variables = new TreeSet<>(environment.keySet());
for (String var : variables) {
- command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var));
+ command.addEnvironmentVariablesBuilder().setName(var).setValue(environment.get(var));
}
return command.build();
}
private static void passRemoteOutErr(
- RemoteActionCache cache, ActionResult result, FileOutErr outErr)
- throws CacheNotFoundException {
- ImmutableList<byte[]> streams =
- cache.downloadBlobs(ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest()));
- outErr.printOut(new String(streams.get(0), UTF_8));
- outErr.printErr(new String(streams.get(1), UTF_8));
+ RemoteActionCache cache, ActionResult result, FileOutErr outErr) throws IOException {
+ try {
+ if (!result.getStdoutRaw().isEmpty()) {
+ result.getStdoutRaw().writeTo(outErr.getOutputStream());
+ outErr.getOutputStream().flush();
+ } else if (result.hasStdoutDigest()) {
+ byte[] stdoutBytes = cache.downloadBlob(result.getStdoutDigest());
+ outErr.getOutputStream().write(stdoutBytes);
+ outErr.getOutputStream().flush();
+ }
+ if (!result.getStderrRaw().isEmpty()) {
+ result.getStderrRaw().writeTo(outErr.getErrorStream());
+ outErr.getErrorStream().flush();
+ } else if (result.hasStderrDigest()) {
+ byte[] stderrBytes = cache.downloadBlob(result.getStderrDigest());
+ outErr.getErrorStream().write(stderrBytes);
+ outErr.getErrorStream().flush();
+ }
+ } catch (CacheNotFoundException e) {
+ outErr.printOutLn("Failed to fetch remote stdout/err due to cache miss.");
+ outErr.getOutputStream().flush();
+ }
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
index f3e12f07ee..3f7698252e 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
@@ -15,7 +15,6 @@
package com.google.devtools.build.lib.remote;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.devtools.build.lib.actions.ActionExecutionContext;
import com.google.devtools.build.lib.actions.ActionInput;
@@ -32,15 +31,7 @@ import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
import com.google.devtools.build.lib.exec.SpawnInputExpander;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Command;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Platform;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.rules.fileset.FilesetActionContext;
import com.google.devtools.build.lib.standalone.StandaloneSpawnStrategy;
@@ -48,6 +39,14 @@ import com.google.devtools.build.lib.util.CommandFailureUtils;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.remoteexecution.v1test.Action;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Command;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
+import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
+import com.google.devtools.remoteexecution.v1test.Platform;
+import com.google.protobuf.Duration;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
import io.grpc.StatusRuntimeException;
@@ -88,12 +87,12 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
this.standaloneStrategy = new StandaloneSpawnStrategy(execRoot, verboseFailures, productName);
this.verboseFailures = verboseFailures;
this.remoteOptions = remoteOptions;
- channelOptions = ChannelOptions.create(authTlsOptions, remoteOptions.grpcMaxChunkSizeBytes);
+ channelOptions = ChannelOptions.create(authTlsOptions);
if (remoteOptions.experimentalRemotePlatformOverride != null) {
Platform.Builder platformBuilder = Platform.newBuilder();
try {
- TextFormat.getParser().merge(remoteOptions.experimentalRemotePlatformOverride,
- platformBuilder);
+ TextFormat.getParser()
+ .merge(remoteOptions.experimentalRemotePlatformOverride, platformBuilder);
} catch (ParseException e) {
throw new IllegalArgumentException(
"Failed to parse --experimental_remote_platform_override", e);
@@ -105,27 +104,32 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
}
private Action buildAction(
- Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) {
+ Collection<? extends ActionInput> outputs,
+ Digest command,
+ Digest inputRoot,
+ long timeoutSeconds) {
Action.Builder action = Action.newBuilder();
action.setCommandDigest(command);
action.setInputRootDigest(inputRoot);
// Somewhat ugly: we rely on the stable order of outputs here for remote action caching.
for (ActionInput output : outputs) {
- action.addOutputPath(output.getExecPathString());
+ // TODO: output directories should be handled here, when they are supported.
+ action.addOutputFiles(output.getExecPathString());
}
if (platform != null) {
action.setPlatform(platform);
}
+ action.setTimeout(Duration.newBuilder().setSeconds(timeoutSeconds));
return action.build();
}
private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) {
Command.Builder command = Command.newBuilder();
- command.addAllArgv(arguments);
+ command.addAllArguments(arguments);
// Sorting the environment pairs by variable name.
TreeSet<String> variables = new TreeSet<>(environment.keySet());
for (String var : variables) {
- command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var));
+ command.addEnvironmentVariablesBuilder().setName(var).setValue(environment.get(var));
}
return command.build();
}
@@ -137,11 +141,11 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
private void execLocally(
Spawn spawn,
ActionExecutionContext actionExecutionContext,
- RemoteActionCache actionCache,
+ RemoteActionCache remoteCache,
ActionKey actionKey)
throws ExecException, InterruptedException {
standaloneStrategy.exec(spawn, actionExecutionContext);
- if (remoteOptions.remoteUploadLocalResults && actionCache != null && actionKey != null) {
+ if (remoteOptions.remoteUploadLocalResults && remoteCache != null && actionKey != null) {
ArrayList<Path> outputFiles = new ArrayList<>();
for (ActionInput output : spawn.getOutputFiles()) {
Path outputFile = execRoot.getRelative(output.getExecPathString());
@@ -155,17 +159,17 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
}
try {
ActionResult.Builder result = ActionResult.newBuilder();
- actionCache.uploadAllResults(execRoot, outputFiles, result);
+ remoteCache.uploadAllResults(execRoot, outputFiles, result);
FileOutErr outErr = actionExecutionContext.getFileOutErr();
if (outErr.getErrorPath().exists()) {
- ContentDigest stderr = actionCache.uploadFileContents(outErr.getErrorPath());
+ Digest stderr = remoteCache.uploadFileContents(outErr.getErrorPath());
result.setStderrDigest(stderr);
}
if (outErr.getOutputPath().exists()) {
- ContentDigest stdout = actionCache.uploadFileContents(outErr.getOutputPath());
+ Digest stdout = remoteCache.uploadFileContents(outErr.getOutputPath());
result.setStdoutDigest(stdout);
}
- actionCache.setCachedActionResult(actionKey, result.build());
+ remoteCache.setCachedActionResult(actionKey, result.build());
// Handle all cache errors here.
} catch (IOException e) {
throw new UserExecException("Unexpected IO error.", e);
@@ -188,21 +192,25 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
private static void passRemoteOutErr(
RemoteActionCache cache, ActionResult result, FileOutErr outErr) throws IOException {
try {
- ImmutableList<byte[]> streams =
- cache.downloadBlobs(ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest()));
- byte[] stdout = streams.get(0);
- byte[] stderr = streams.get(1);
- // Don't round-trip through String - write to the output streams directly.
- if (stdout.length != 0) {
- outErr.getOutputStream().write(stdout);
+ if (!result.getStdoutRaw().isEmpty()) {
+ result.getStdoutRaw().writeTo(outErr.getOutputStream());
+ outErr.getOutputStream().flush();
+ } else if (result.hasStdoutDigest()) {
+ byte[] stdoutBytes = cache.downloadBlob(result.getStdoutDigest());
+ outErr.getOutputStream().write(stdoutBytes);
outErr.getOutputStream().flush();
}
- if (stderr.length != 0) {
- outErr.getErrorStream().write(stderr);
+ if (!result.getStderrRaw().isEmpty()) {
+ result.getStderrRaw().writeTo(outErr.getErrorStream());
+ outErr.getErrorStream().flush();
+ } else if (result.hasStderrDigest()) {
+ byte[] stderrBytes = cache.downloadBlob(result.getStderrDigest());
+ outErr.getErrorStream().write(stderrBytes);
outErr.getErrorStream().flush();
}
} catch (CacheNotFoundException e) {
- // Ignoring.
+ outErr.printOutLn("Failed to fetch remote stdout/err due to cache miss.");
+ outErr.getOutputStream().flush();
}
}
@@ -220,19 +228,23 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
Executor executor = actionExecutionContext.getExecutor();
EventHandler eventHandler = executor.getEventHandler();
- RemoteActionCache actionCache = null;
+ RemoteActionCache remoteCache = null;
GrpcRemoteExecutor workExecutor = null;
if (spawn.isRemotable()) {
// Initialize remote cache and execution handlers. We use separate handlers for every
// action to enable server-side parallelism (need a different gRPC channel per action).
if (SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions)) {
- actionCache = new SimpleBlobStoreActionCache(SimpleBlobStoreFactory.create(remoteOptions));
+ remoteCache = new SimpleBlobStoreActionCache(SimpleBlobStoreFactory.create(remoteOptions));
} else if (GrpcActionCache.isRemoteCacheOptions(remoteOptions)) {
- actionCache = new GrpcActionCache(remoteOptions, channelOptions);
+ remoteCache =
+ new GrpcActionCache(
+ RemoteUtils.createChannel(remoteOptions.remoteCache, channelOptions),
+ channelOptions,
+ remoteOptions);
}
- // Otherwise actionCache remains null and remote caching/execution are disabled.
+ // Otherwise remoteCache remains null and remote caching/execution are disabled.
- if (actionCache != null && GrpcRemoteExecutor.isRemoteExecutionOptions(remoteOptions)) {
+ if (remoteCache != null && GrpcRemoteExecutor.isRemoteExecutionOptions(remoteOptions)) {
workExecutor =
new GrpcRemoteExecutor(
RemoteUtils.createChannel(remoteOptions.remoteExecutor, channelOptions),
@@ -240,15 +252,16 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
remoteOptions);
}
}
- if (!spawn.isRemotable() || actionCache == null) {
+ if (!spawn.isRemotable() || remoteCache == null) {
standaloneStrategy.exec(spawn, actionExecutionContext);
return;
}
if (executor.reportsSubcommands()) {
executor.reportSubcommand(spawn);
}
- executor.getEventBus().post(
- ActionStatusMessage.runningStrategy(spawn.getResourceOwner(), "remote"));
+ executor
+ .getEventBus()
+ .post(ActionStatusMessage.runningStrategy(spawn.getResourceOwner(), "remote"));
try {
// Temporary hack: the TreeNodeRepository should be created and maintained upstream!
@@ -266,22 +279,25 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
Action action =
buildAction(
spawn.getOutputFiles(),
- ContentDigests.computeDigest(command),
- repository.getMerkleDigest(inputRoot));
+ Digests.computeDigest(command),
+ repository.getMerkleDigest(inputRoot),
+ // TODO(olaola): set sensible local and remote timouts.
+ Spawns.getTimeoutSeconds(spawn, 120));
// Look up action cache, and reuse the action output if it is found.
- actionKey = ContentDigests.computeActionKey(action);
- ActionResult result = this.remoteOptions.remoteAcceptCached
- ? actionCache.getCachedActionResult(actionKey)
- : null;
+ actionKey = Digests.computeActionKey(action);
+ ActionResult result =
+ this.remoteOptions.remoteAcceptCached
+ ? remoteCache.getCachedActionResult(actionKey)
+ : null;
boolean acceptCachedResult = this.remoteOptions.remoteAcceptCached;
if (result != null) {
// We don't cache failed actions, so we know the outputs exist.
// For now, download all outputs locally; in the future, we can reuse the digests to
// just update the TreeNodeRepository and continue the build.
try {
- actionCache.downloadAllResults(result, execRoot);
- passRemoteOutErr(actionCache, result, actionExecutionContext.getFileOutErr());
+ remoteCache.downloadAllResults(result, execRoot);
+ passRemoteOutErr(remoteCache, result, actionExecutionContext.getFileOutErr());
return;
} catch (CacheNotFoundException e) {
acceptCachedResult = false; // Retry the action remotely and invalidate the results.
@@ -289,47 +305,39 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
}
if (workExecutor == null) {
- execLocally(spawn, actionExecutionContext, actionCache, actionKey);
+ execLocally(spawn, actionExecutionContext, remoteCache, actionKey);
return;
}
// Upload the command and all the inputs into the remote cache.
- actionCache.uploadBlob(command.toByteArray());
- // TODO(olaola): this should use the ActionInputFileCache for SHA1 digests!
- actionCache.uploadTree(repository, execRoot, inputRoot);
+ remoteCache.uploadBlob(command.toByteArray());
+ remoteCache.uploadTree(repository, execRoot, inputRoot);
// TODO(olaola): set BuildInfo and input total bytes as well.
ExecuteRequest.Builder request =
ExecuteRequest.newBuilder()
+ .setInstanceName(remoteOptions.remoteInstanceName)
.setAction(action)
- .setAcceptCached(acceptCachedResult)
+ .setWaitForCompletion(true)
.setTotalInputFileCount(inputMap.size())
- .setTimeoutMillis(1000 * Spawns.getTimeoutSeconds(spawn, 120));
- // TODO(olaola): set sensible local and remote timouts.
- ExecuteReply reply = workExecutor.executeRemotely(request.build());
- ExecutionStatus status = reply.getStatus();
+ .setSkipCacheLookup(!acceptCachedResult);
+ ExecuteResponse reply = workExecutor.executeRemotely(request.build());
result = reply.getResult();
- // We do not want to pass on the remote stdout and strerr if we are going to retry the
- // action.
- if (status.getSucceeded()) {
- passRemoteOutErr(actionCache, result, actionExecutionContext.getFileOutErr());
- actionCache.downloadAllResults(result, execRoot);
- if (result.getReturnCode() != 0) {
- String cwd = executor.getExecRoot().getPathString();
- String message =
- CommandFailureUtils.describeCommandFailure(
- verboseFailures, spawn.getArguments(), spawn.getEnvironment(), cwd);
- throw new UserExecException(message + ": Exit " + result.getReturnCode());
- }
+ if (remoteOptions.remoteLocalFallback && result.getExitCode() != 0) {
+ execLocally(spawn, actionExecutionContext, remoteCache, actionKey);
return;
}
- if (status.getError() == ExecutionStatus.ErrorCode.EXEC_FAILED
- || !remoteOptions.remoteLocalFallback) {
- passRemoteOutErr(actionCache, result, actionExecutionContext.getFileOutErr());
- throw new UserExecException(status.getErrorDetail());
+ passRemoteOutErr(remoteCache, result, actionExecutionContext.getFileOutErr());
+ if (result.getExitCode() == 0) {
+ remoteCache.downloadAllResults(result, execRoot);
+ } else {
+ String cwd = executor.getExecRoot().getPathString();
+ String message =
+ CommandFailureUtils.describeCommandFailure(
+ verboseFailures, spawn.getArguments(), spawn.getEnvironment(), cwd);
+ throw new UserExecException(message + ": Exit " + result.getExitCode());
}
// For now, we retry locally on all other remote errors.
// TODO(olaola): add remote retries on cache miss errors.
- execLocally(spawn, actionExecutionContext, actionCache, actionKey);
} catch (IOException e) {
throw new UserExecException("Unexpected IO error.", e);
} catch (InterruptedException e) {
@@ -343,14 +351,15 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
}
eventHandler.handle(Event.warn(mnemonic + " remote work failed (" + e + ")" + stackTrace));
if (remoteOptions.remoteLocalFallback) {
- execLocally(spawn, actionExecutionContext, actionCache, actionKey);
+ execLocally(spawn, actionExecutionContext, remoteCache, actionKey);
} else {
throw new UserExecException(e);
}
} catch (CacheNotFoundException e) {
+ // TODO(olaola): handle this exception by reuploading / reexecuting the action remotely.
eventHandler.handle(Event.warn(mnemonic + " remote work results cache miss (" + e + ")"));
if (remoteOptions.remoteLocalFallback) {
- execLocally(spawn, actionExecutionContext, actionCache, actionKey);
+ execLocally(spawn, actionExecutionContext, remoteCache, actionKey);
} else {
throw new UserExecException(e);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
index 845f5411fe..58b2c94bf8 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
@@ -26,8 +26,7 @@ public final class RemoteUtils {
NettyChannelBuilder builder =
NettyChannelBuilder.forTarget(target)
.negotiationType(
- channelOptions.tlsEnabled() ? NegotiationType.TLS : NegotiationType.PLAINTEXT)
- .maxMessageSize(channelOptions.maxMessageSize());
+ channelOptions.tlsEnabled() ? NegotiationType.TLS : NegotiationType.PLAINTEXT);
if (channelOptions.getSslContext() != null) {
builder.sslContext(channelOptions.getSslContext());
if (channelOptions.getTlsAuthorityOverride() != null) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
index bb1ca512d5..75d208a687 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
@@ -14,28 +14,27 @@
package com.google.devtools.build.lib.remote;
-import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileMetadata;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Output;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Output.ContentCase;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.Directory;
+import com.google.devtools.remoteexecution.v1test.DirectoryNode;
+import com.google.devtools.remoteexecution.v1test.FileNode;
+import com.google.devtools.remoteexecution.v1test.OutputDirectory;
+import com.google.devtools.remoteexecution.v1test.OutputFile;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Semaphore;
@@ -59,8 +58,8 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
public void uploadTree(TreeNodeRepository repository, Path execRoot, TreeNode root)
throws IOException, InterruptedException {
repository.computeMerkleDigests(root);
- for (FileNode fileNode : repository.treeToFileNodes(root)) {
- uploadBlob(fileNode.toByteArray());
+ for (Directory directory : repository.treeToDirectories(root)) {
+ uploadBlob(directory.toByteArray());
}
// TODO(ulfjack): Only upload files that aren't in the CAS yet?
for (TreeNode leaf : repository.leaves(root)) {
@@ -69,26 +68,26 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
}
@Override
- public void downloadTree(ContentDigest rootDigest, Path rootLocation)
+ public void downloadTree(Digest rootDigest, Path rootLocation)
throws IOException, CacheNotFoundException {
- FileNode fileNode = FileNode.parseFrom(downloadBlob(rootDigest));
- if (fileNode.hasFileMetadata()) {
- FileMetadata meta = fileNode.getFileMetadata();
- downloadFileContents(meta.getDigest(), rootLocation, meta.getExecutable());
+ Directory directory = Directory.parseFrom(downloadBlob(rootDigest));
+ for (FileNode file : directory.getFilesList()) {
+ downloadFileContents(
+ file.getDigest(), rootLocation.getRelative(file.getName()), file.getIsExecutable());
}
- for (FileNode.Child child : fileNode.getChildList()) {
- downloadTree(child.getDigest(), rootLocation.getRelative(child.getPath()));
+ for (DirectoryNode child : directory.getDirectoriesList()) {
+ downloadTree(child.getDigest(), rootLocation.getRelative(child.getName()));
}
}
@Override
- public ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException {
+ public Digest uploadFileContents(Path file) throws IOException, InterruptedException {
// This unconditionally reads the whole file into memory first!
return uploadBlob(ByteString.readFrom(file.getInputStream()).toByteArray());
}
@Override
- public ContentDigest uploadFileContents(
+ public Digest uploadFileContents(
ActionInput input, Path execRoot, ActionInputFileCache inputCache)
throws IOException, InterruptedException {
// This unconditionally reads the whole file into memory first!
@@ -96,26 +95,31 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
((VirtualActionInput) input).writeTo(buffer);
byte[] blob = buffer.toByteArray();
- return uploadBlob(blob, ContentDigests.computeDigest(blob));
+ return uploadBlob(blob, Digests.computeDigest(blob));
}
return uploadBlob(
ByteString.readFrom(execRoot.getRelative(input.getExecPathString()).getInputStream())
.toByteArray(),
- ContentDigests.getDigestFromInputCache(input, inputCache));
+ Digests.getDigestFromInputCache(input, inputCache));
}
@Override
public void downloadAllResults(ActionResult result, Path execRoot)
throws IOException, CacheNotFoundException {
- for (Output output : result.getOutputList()) {
- if (output.getContentCase() == ContentCase.FILE_METADATA) {
- FileMetadata m = output.getFileMetadata();
- downloadFileContents(
- m.getDigest(), execRoot.getRelative(output.getPath()), m.getExecutable());
+ for (OutputFile file : result.getOutputFilesList()) {
+ if (!file.getContent().isEmpty()) {
+ createFile(
+ file.getContent().toByteArray(),
+ execRoot.getRelative(file.getPath()),
+ file.getIsExecutable());
} else {
- downloadTree(output.getDigest(), execRoot.getRelative(output.getPath()));
+ downloadFileContents(
+ file.getDigest(), execRoot.getRelative(file.getPath()), file.getIsExecutable());
}
}
+ for (OutputDirectory directory : result.getOutputDirectoriesList()) {
+ downloadTree(directory.getDigest(), execRoot.getRelative(directory.getPath()));
+ }
}
@Override
@@ -130,22 +134,25 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
// TreeNodeRepository to call uploadTree.
throw new UnsupportedOperationException("Storing a directory is not yet supported.");
}
+ // TODO(olaola): inline small file contents here.
// First put the file content to cache.
- ContentDigest digest = uploadFileContents(file);
+ Digest digest = uploadFileContents(file);
// Add to protobuf.
result
- .addOutputBuilder()
+ .addOutputFilesBuilder()
.setPath(file.relativeTo(execRoot).getPathString())
- .getFileMetadataBuilder()
.setDigest(digest)
- .setExecutable(file.isExecutable());
+ .setIsExecutable(file.isExecutable());
}
}
- private void downloadFileContents(ContentDigest digest, Path dest, boolean executable)
+ private void downloadFileContents(Digest digest, Path dest, boolean executable)
throws IOException, CacheNotFoundException {
// This unconditionally downloads the whole file into memory first!
- byte[] contents = downloadBlob(digest);
+ createFile(downloadBlob(digest), dest, executable);
+ }
+
+ private void createFile(byte[] contents, Path dest, boolean executable) throws IOException {
FileSystemUtils.createDirectoryAndParents(dest.getParentDirectory());
try (OutputStream stream = dest.getOutputStream()) {
stream.write(contents);
@@ -153,16 +160,6 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
dest.setExecutable(executable);
}
- @Override
- public ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs)
- throws InterruptedException {
- ArrayList<ContentDigest> digests = new ArrayList<>();
- for (byte[] blob : blobs) {
- digests.add(uploadBlob(blob));
- }
- return ImmutableList.copyOf(digests);
- }
-
private void checkBlobSize(long blobSizeKBytes, String type) {
Preconditions.checkArgument(
blobSizeKBytes < MAX_MEMORY_KBYTES,
@@ -172,16 +169,16 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
}
@Override
- public ContentDigest uploadBlob(byte[] blob) throws InterruptedException {
- return uploadBlob(blob, ContentDigests.computeDigest(blob));
+ public Digest uploadBlob(byte[] blob) throws InterruptedException {
+ return uploadBlob(blob, Digests.computeDigest(blob));
}
- private ContentDigest uploadBlob(byte[] blob, ContentDigest digest) throws InterruptedException {
+ private Digest uploadBlob(byte[] blob, Digest digest) throws InterruptedException {
int blobSizeKBytes = blob.length / 1024;
checkBlobSize(blobSizeKBytes, "Upload");
uploadMemoryAvailable.acquire(blobSizeKBytes);
try {
- blobStore.put(ContentDigests.toHexString(digest), blob);
+ blobStore.put(digest.getHash(), blob);
} finally {
uploadMemoryAvailable.release(blobSizeKBytes);
}
@@ -189,36 +186,26 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
}
@Override
- public byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException {
+ public byte[] downloadBlob(Digest digest) throws CacheNotFoundException {
if (digest.getSizeBytes() == 0) {
return new byte[0];
}
// This unconditionally downloads the whole blob into memory!
checkBlobSize(digest.getSizeBytes() / 1024, "Download");
- byte[] data = blobStore.get(ContentDigests.toHexString(digest));
+ byte[] data = blobStore.get(digest.getHash());
if (data == null) {
throw new CacheNotFoundException(digest);
}
return data;
}
- @Override
- public ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests)
- throws CacheNotFoundException {
- ArrayList<byte[]> blobs = new ArrayList<>();
- for (ContentDigest c : digests) {
- blobs.add(downloadBlob(c));
- }
- return ImmutableList.copyOf(blobs);
- }
-
- public boolean containsKey(ContentDigest digest) {
- return blobStore.containsKey(ContentDigests.toHexString(digest));
+ public boolean containsKey(Digest digest) {
+ return blobStore.containsKey(digest.getHash());
}
@Override
public ActionResult getCachedActionResult(ActionKey actionKey) {
- byte[] data = blobStore.get(ContentDigests.toHexString(actionKey.getDigest()));
+ byte[] data = blobStore.get(actionKey.getDigest().getHash());
if (data == null) {
return null;
}
@@ -232,6 +219,6 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
@Override
public void setCachedActionResult(ActionKey actionKey, ActionResult result)
throws InterruptedException {
- blobStore.put(ContentDigests.toHexString(actionKey.getDigest()), result.toByteArray());
+ blobStore.put(actionKey.getDigest().getHash(), result.toByteArray());
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java b/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java
index 8272295a84..dba3a14c12 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java
@@ -29,11 +29,11 @@ import com.google.devtools.build.lib.concurrent.BlazeInterners;
import com.google.devtools.build.lib.concurrent.ThreadSafety.Immutable;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.exec.SpawnInputExpander;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.Directory;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
@@ -185,11 +185,11 @@ public final class TreeNodeRepository extends TreeTraverser<TreeNodeRepository.T
// be part of the state.
private final Path execRoot;
private final ActionInputFileCache inputFileCache;
- private final Map<TreeNode, ContentDigest> treeNodeDigestCache = new HashMap<>();
- private final Map<ContentDigest, TreeNode> digestTreeNodeCache = new HashMap<>();
- private final Map<TreeNode, FileNode> fileNodeCache = new HashMap<>();
- private final Map<VirtualActionInput, ContentDigest> virtualInputDigestCache = new HashMap<>();
- private final Map<ContentDigest, VirtualActionInput> digestVirtualInputCache = new HashMap<>();
+ private final Map<TreeNode, Digest> treeNodeDigestCache = new HashMap<>();
+ private final Map<Digest, TreeNode> digestTreeNodeCache = new HashMap<>();
+ private final Map<TreeNode, Directory> directoryCache = new HashMap<>();
+ private final Map<VirtualActionInput, Digest> virtualInputDigestCache = new HashMap<>();
+ private final Map<Digest, VirtualActionInput> digestVirtualInputCache = new HashMap<>();
public TreeNodeRepository(Path execRoot, ActionInputFileCache inputFileCache) {
this.execRoot = execRoot;
@@ -298,125 +298,125 @@ public final class TreeNodeRepository extends TreeTraverser<TreeNodeRepository.T
return interner.intern(new TreeNode(entries));
}
- private synchronized FileNode getOrComputeFileNode(TreeNode node) throws IOException {
+ private synchronized Directory getOrComputeDirectory(TreeNode node) throws IOException {
// Assumes all child digests have already been computed!
- FileNode fileNode = fileNodeCache.get(node);
- if (fileNode == null) {
- FileNode.Builder b = FileNode.newBuilder();
- if (node.isLeaf()) {
- ActionInput input = node.getActionInput();
- if (input instanceof VirtualActionInput) {
- VirtualActionInput virtualInput = (VirtualActionInput) input;
- ContentDigest digest = ContentDigests.computeDigest(virtualInput);
- virtualInputDigestCache.put(virtualInput, digest);
- // There may be multiple inputs with the same digest. In that case, we don't care which
- // one we get back from the digestVirtualInputCache later.
- digestVirtualInputCache.put(digest, virtualInput);
- b.getFileMetadataBuilder()
- .setDigest(digest)
- // We always declare virtual action inputs as non-executable for now.
- .setExecutable(false);
+ Preconditions.checkArgument(!node.isLeaf());
+ Directory directory = directoryCache.get(node);
+ if (directory == null) {
+ Directory.Builder b = Directory.newBuilder();
+ for (TreeNode.ChildEntry entry : node.getChildEntries()) {
+ TreeNode child = entry.getChild();
+ if (child.isLeaf()) {
+ ActionInput input = child.getActionInput();
+ if (input instanceof VirtualActionInput) {
+ VirtualActionInput virtualInput = (VirtualActionInput) input;
+ Digest digest = Digests.computeDigest(virtualInput);
+ virtualInputDigestCache.put(virtualInput, digest);
+ // There may be multiple inputs with the same digest. In that case, we don't care which
+ // one we get back from the digestVirtualInputCache later.
+ digestVirtualInputCache.put(digest, virtualInput);
+ b.addFilesBuilder()
+ .setName(entry.getSegment())
+ .setDigest(digest)
+ .setIsExecutable(false);
+ } else {
+ b.addFilesBuilder()
+ .setName(entry.getSegment())
+ .setDigest(Digests.getDigestFromInputCache(input, inputFileCache))
+ .setIsExecutable(execRoot.getRelative(input.getExecPathString()).isExecutable());
+ }
} else {
- b.getFileMetadataBuilder()
- .setDigest(ContentDigests.getDigestFromInputCache(input, inputFileCache))
- .setExecutable(execRoot.getRelative(input.getExecPathString()).isExecutable());
- }
- } else {
- for (TreeNode.ChildEntry entry : node.getChildEntries()) {
- ContentDigest childDigest = treeNodeDigestCache.get(entry.getChild());
- Preconditions.checkState(childDigest != null);
- b.addChildBuilder().setPath(entry.getSegment()).setDigest(childDigest);
+ Digest childDigest = Preconditions.checkNotNull(treeNodeDigestCache.get(child));
+ b.addDirectoriesBuilder().setName(entry.getSegment()).setDigest(childDigest);
}
}
- fileNode = b.build();
- fileNodeCache.put(node, fileNode);
- ContentDigest digest = ContentDigests.computeDigest(fileNode);
+ directory = b.build();
+ directoryCache.put(node, directory);
+ Digest digest = Digests.computeDigest(directory);
treeNodeDigestCache.put(node, digest);
digestTreeNodeCache.put(digest, node);
}
- return fileNode;
+ return directory;
}
// Recursively traverses the tree, expanding and computing Merkle digests for nodes for which
// they have not yet been computed and cached.
public void computeMerkleDigests(TreeNode root) throws IOException {
synchronized (this) {
- if (fileNodeCache.get(root) != null) {
+ if (directoryCache.get(root) != null) {
// Strong assumption: the cache is valid, i.e. parent present implies children present.
return;
}
}
- if (root.isLeaf()) {
- ActionInput input = root.getActionInput();
- if (!(input instanceof VirtualActionInput)) {
- inputFileCache.getDigest(input);
- }
- } else {
+ if (!root.isLeaf()) {
for (TreeNode child : children(root)) {
computeMerkleDigests(child);
}
+ getOrComputeDirectory(root);
}
- getOrComputeFileNode(root);
}
/**
* Should only be used after computeMerkleDigests has been called on one of the node ancestors.
* Returns the precomputed digest.
*/
- public ContentDigest getMerkleDigest(TreeNode node) {
- return treeNodeDigestCache.get(node);
+ public Digest getMerkleDigest(TreeNode node) throws IOException {
+ return node.isLeaf()
+ ? actionInputToDigest(node.getActionInput())
+ : treeNodeDigestCache.get(node);
}
/**
* Returns the precomputed digests for both data and metadata. Should only be used after
* computeMerkleDigests has been called on one of the node ancestors.
*/
- public ImmutableCollection<ContentDigest> getAllDigests(TreeNode root) throws IOException {
- ImmutableSet.Builder<ContentDigest> digests = ImmutableSet.builder();
+ public ImmutableCollection<Digest> getAllDigests(TreeNode root) throws IOException {
+ ImmutableSet.Builder<Digest> digests = ImmutableSet.builder();
for (TreeNode node : descendants(root)) {
- digests.add(Preconditions.checkNotNull(treeNodeDigestCache.get(node)));
- if (node.isLeaf()) {
- digests.add(actionInputToDigest(node.getActionInput()));
- }
+ digests.add(
+ node.isLeaf()
+ ? actionInputToDigest(node.getActionInput())
+ : Preconditions.checkNotNull(treeNodeDigestCache.get(node)));
}
return digests.build();
}
- private ContentDigest actionInputToDigest(ActionInput input) throws IOException {
+ private Digest actionInputToDigest(ActionInput input) throws IOException {
if (input instanceof VirtualActionInput) {
return Preconditions.checkNotNull(virtualInputDigestCache.get(input));
}
- return ContentDigests.getDigestFromInputCache(input, inputFileCache);
+ return Digests.getDigestFromInputCache(input, inputFileCache);
}
/**
- * Serializes all of the subtree to the file node list. TODO(olaola): add a version that only
- * copies a part of the tree that we are interested in. Should only be used after
- * computeMerkleDigests has been called on one of the node ancestors.
+ * Serializes all of the subtree to a Directory list. TODO(olaola): add a version that only copies
+ * a part of the tree that we are interested in. Should only be used after computeMerkleDigests
+ * has been called on one of the node ancestors.
*/
// Note: this is not, strictly speaking, thread safe. If someone is deleting cached Merkle hashes
// while this is executing, it will trigger an exception. But I think this is WAI.
- public ImmutableList<FileNode> treeToFileNodes(TreeNode root) {
- ImmutableList.Builder<FileNode> fileNodes = ImmutableList.builder();
+ public ImmutableList<Directory> treeToDirectories(TreeNode root) {
+ ImmutableList.Builder<Directory> directories = ImmutableList.builder();
for (TreeNode node : descendants(root)) {
- fileNodes.add(Preconditions.checkNotNull(fileNodeCache.get(node)));
+ if (!node.isLeaf()) {
+ directories.add(Preconditions.checkNotNull(directoryCache.get(node)));
+ }
}
- return fileNodes.build();
+ return directories.build();
}
/**
* Should only be used on digests created by a call to computeMerkleDigests. Looks up ActionInputs
- * or FileNodes by cached digests and adds them to the lists.
+ * or Directory messages by cached digests and adds them to the lists.
*/
public void getDataFromDigests(
- Iterable<ContentDigest> digests, List<ActionInput> actionInputs, List<FileNode> nodes) {
- for (ContentDigest digest : digests) {
+ Iterable<Digest> digests, List<ActionInput> actionInputs, List<Directory> nodes) {
+ for (Digest digest : digests) {
TreeNode treeNode = digestTreeNodeCache.get(digest);
if (treeNode != null) {
- nodes.add(Preconditions.checkNotNull(fileNodeCache.get(treeNode)));
- } else {
- // If not there, it must be an ActionInput.
- ByteString hexDigest = ByteString.copyFromUtf8(ContentDigests.toHexString(digest));
+ nodes.add(Preconditions.checkNotNull(directoryCache.get(treeNode)));
+ } else { // If not there, it must be an ActionInput.
+ ByteString hexDigest = ByteString.copyFromUtf8(digest.getHash());
ActionInput input = inputFileCache.getInputFromDigest(hexDigest);
if (input == null) {
// ... or a VirtualActionInput.