aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/BUILD9
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java10
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java102
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java24
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/Chunker.java218
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/Digests.java (renamed from src/main/java/com/google/devtools/build/lib/remote/ContentDigests.java)61
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java603
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java43
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java29
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java27
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java131
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java80
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/README.md4
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java29
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java32
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java136
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java163
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java3
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java115
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java138
-rw-r--r--src/main/protobuf/BUILD9
-rw-r--r--src/main/protobuf/remote_protocol.proto329
-rw-r--r--src/test/java/com/google/devtools/build/lib/BUILD9
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java236
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java43
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java21
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java56
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java629
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java383
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/InMemoryCas.java143
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java70
-rwxr-xr-xsrc/test/shell/bazel/remote_execution_test.sh2
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD10
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java649
34 files changed, 2029 insertions, 2517 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD
index 3a3c304d1e..d2e82efa60 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD
@@ -23,8 +23,6 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/actions",
"//src/main/java/com/google/devtools/build/lib/standalone",
"//src/main/java/com/google/devtools/common/options",
- "//src/main/protobuf:remote_protocol_java_grpc",
- "//src/main/protobuf:remote_protocol_java_proto",
"//third_party:apache_httpclient",
"//third_party:apache_httpcore",
"//third_party:auth",
@@ -35,6 +33,13 @@ java_library(
"//third_party:netty",
"//third_party/grpc:grpc-jar",
"//third_party/protobuf:protobuf_java",
+ "//third_party/protobuf:protobuf_java_util",
+ "@googleapis//:google_bytestream_bytestream_java_grpc",
+ "@googleapis//:google_bytestream_bytestream_java_proto",
+ "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc",
+ "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto",
+ "@googleapis//:google_longrunning_operations_java_proto",
+ "@googleapis//:google_rpc_error_details_java_proto",
],
)
diff --git a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java b/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
index 5820e0bf95..cffc58fd4b 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
@@ -14,25 +14,25 @@
package com.google.devtools.build.lib.remote;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+import com.google.devtools.remoteexecution.v1test.Digest;
/**
* An exception to indicate cache misses.
* TODO(olaola): have a class of checked RemoteCacheExceptions.
*/
public final class CacheNotFoundException extends Exception {
- private final ContentDigest missingDigest;
+ private final Digest missingDigest;
- CacheNotFoundException(ContentDigest missingDigest) {
+ CacheNotFoundException(Digest missingDigest) {
this.missingDigest = missingDigest;
}
- public ContentDigest getMissingDigest() {
+ public Digest getMissingDigest() {
return missingDigest;
}
@Override
public String toString() {
- return "Missing digest: " + ContentDigests.toString(missingDigest);
+ return "Missing digest: " + missingDigest;
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java
index 87da7c2405..62605e1c2d 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java
@@ -13,28 +13,27 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.Spawn;
+import com.google.devtools.build.lib.actions.Spawns;
import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.exec.SpawnResult;
import com.google.devtools.build.lib.exec.SpawnResult.Status;
import com.google.devtools.build.lib.exec.SpawnRunner;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Command;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Platform;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.remoteexecution.v1test.Action;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Command;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.Platform;
+import com.google.protobuf.Duration;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
import io.grpc.StatusRuntimeException;
@@ -56,14 +55,11 @@ final class CachedLocalSpawnRunner implements SpawnRunner {
// TODO(olaola): This will be set on a per-action basis instead.
private final Platform platform;
- private final RemoteActionCache actionCache;
+ private final RemoteActionCache remoteCache;
private final SpawnRunner delegate;
CachedLocalSpawnRunner(
- Path execRoot,
- RemoteOptions options,
- RemoteActionCache actionCache,
- SpawnRunner delegate) {
+ Path execRoot, RemoteOptions options, RemoteActionCache remoteCache, SpawnRunner delegate) {
this.execRoot = execRoot;
this.options = options;
if (options.experimentalRemotePlatformOverride != null) {
@@ -77,15 +73,13 @@ final class CachedLocalSpawnRunner implements SpawnRunner {
} else {
platform = null;
}
- this.actionCache = actionCache;
+ this.remoteCache = remoteCache;
this.delegate = delegate;
}
@Override
- public SpawnResult exec(
- Spawn spawn,
- SpawnExecutionPolicy policy)
- throws InterruptedException, IOException, ExecException {
+ public SpawnResult exec(Spawn spawn, SpawnExecutionPolicy policy)
+ throws InterruptedException, IOException, ExecException {
ActionKey actionKey = null;
String mnemonic = spawn.getMnemonic();
@@ -100,24 +94,26 @@ final class CachedLocalSpawnRunner implements SpawnRunner {
Action action =
buildAction(
spawn.getOutputFiles(),
- ContentDigests.computeDigest(command),
- repository.getMerkleDigest(inputRoot));
+ Digests.computeDigest(command),
+ repository.getMerkleDigest(inputRoot),
+ // TODO(olaola): set sensible local and remote timeouts.
+ Spawns.getTimeoutSeconds(spawn, 120));
// Look up action cache, and reuse the action output if it is found.
- actionKey = ContentDigests.computeActionKey(action);
+ actionKey = Digests.computeActionKey(action);
ActionResult result =
- this.options.remoteAcceptCached ? actionCache.getCachedActionResult(actionKey) : null;
+ this.options.remoteAcceptCached ? remoteCache.getCachedActionResult(actionKey) : null;
if (result != null) {
// We don't cache failed actions, so we know the outputs exist.
// For now, download all outputs locally; in the future, we can reuse the digests to
// just update the TreeNodeRepository and continue the build.
try {
// TODO(ulfjack): Download stdout, stderr, and the output files in a single call.
- actionCache.downloadAllResults(result, execRoot);
+ remoteCache.downloadAllResults(result, execRoot);
passRemoteOutErr(result, policy.getFileOutErr());
return new SpawnResult.Builder()
.setStatus(Status.SUCCESS)
- .setExitCode(result.getReturnCode())
+ .setExitCode(result.getExitCode())
.build();
} catch (CacheNotFoundException e) {
// TODO(ulfjack): Track down who throws this exception in what cases and double-check that
@@ -138,44 +134,62 @@ final class CachedLocalSpawnRunner implements SpawnRunner {
}
private Action buildAction(
- Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) {
+ Collection<? extends ActionInput> outputs,
+ Digest command,
+ Digest inputRoot,
+ long timeoutSeconds) {
Action.Builder action = Action.newBuilder();
action.setCommandDigest(command);
action.setInputRootDigest(inputRoot);
// Somewhat ugly: we rely on the stable order of outputs here for remote action caching.
for (ActionInput output : outputs) {
- action.addOutputPath(output.getExecPathString());
+ // TODO: output directories should be handled here, when they are supported.
+ action.addOutputFiles(output.getExecPathString());
}
if (platform != null) {
action.setPlatform(platform);
}
+ action.setTimeout(Duration.newBuilder().setSeconds(timeoutSeconds));
return action.build();
}
- private static Command buildCommand(
- List<String> arguments, ImmutableMap<String, String> environment) {
+ private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) {
Command.Builder command = Command.newBuilder();
- command.addAllArgv(arguments);
+ command.addAllArguments(arguments);
// Sorting the environment pairs by variable name.
TreeSet<String> variables = new TreeSet<>(environment.keySet());
for (String var : variables) {
- command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var));
+ command.addEnvironmentVariablesBuilder().setName(var).setValue(environment.get(var));
}
return command.build();
}
- private void passRemoteOutErr(
- ActionResult result, FileOutErr outErr)
- throws CacheNotFoundException {
- ImmutableList<byte[]> streams =
- actionCache.downloadBlobs(
- ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest()));
- outErr.printOut(new String(streams.get(0), UTF_8));
- outErr.printErr(new String(streams.get(1), UTF_8));
+ private void passRemoteOutErr(ActionResult result, FileOutErr outErr) throws IOException {
+ try {
+ if (!result.getStdoutRaw().isEmpty()) {
+ result.getStdoutRaw().writeTo(outErr.getOutputStream());
+ outErr.getOutputStream().flush();
+ } else if (result.hasStdoutDigest()) {
+ byte[] stdoutBytes = remoteCache.downloadBlob(result.getStdoutDigest());
+ outErr.getOutputStream().write(stdoutBytes);
+ outErr.getOutputStream().flush();
+ }
+ if (!result.getStderrRaw().isEmpty()) {
+ result.getStderrRaw().writeTo(outErr.getErrorStream());
+ outErr.getErrorStream().flush();
+ } else if (result.hasStderrDigest()) {
+ byte[] stderrBytes = remoteCache.downloadBlob(result.getStderrDigest());
+ outErr.getErrorStream().write(stderrBytes);
+ outErr.getErrorStream().flush();
+ }
+ } catch (CacheNotFoundException e) {
+ outErr.printOutLn("Failed to fetch remote stdout/err due to cache miss.");
+ outErr.getOutputStream().flush();
+ }
}
private void writeCacheEntry(Spawn spawn, FileOutErr outErr, ActionKey actionKey)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
ArrayList<Path> outputFiles = new ArrayList<>();
for (ActionInput output : spawn.getOutputFiles()) {
Path outputPath = execRoot.getRelative(output.getExecPathString());
@@ -186,11 +200,11 @@ final class CachedLocalSpawnRunner implements SpawnRunner {
}
}
ActionResult.Builder result = ActionResult.newBuilder();
- actionCache.uploadAllResults(execRoot, outputFiles, result);
- ContentDigest stderr = actionCache.uploadFileContents(outErr.getErrorPath());
- ContentDigest stdout = actionCache.uploadFileContents(outErr.getOutputPath());
+ remoteCache.uploadAllResults(execRoot, outputFiles, result);
+ Digest stderr = remoteCache.uploadFileContents(outErr.getErrorPath());
+ Digest stdout = remoteCache.uploadFileContents(outErr.getOutputPath());
result.setStderrDigest(stderr);
result.setStdoutDigest(stdout);
- actionCache.setCachedActionResult(actionKey, result.build());
+ remoteCache.setCachedActionResult(actionKey, result.build());
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java b/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java
index c2654586bf..a78458d135 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java
@@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+import com.google.devtools.common.options.Options;
import io.grpc.CallCredentials;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.netty.GrpcSslContexts;
@@ -33,24 +34,21 @@ import javax.net.ssl.SSLException;
/** Instantiate all authentication helpers from build options. */
@ThreadSafe
public final class ChannelOptions {
- private final int maxMessageSize;
private final boolean tlsEnabled;
private final SslContext sslContext;
private final String tlsAuthorityOverride;
private final CallCredentials credentials;
- private static final int CHUNK_MESSAGE_OVERHEAD = 1024;
+ public static final ChannelOptions DEFAULT = create(Options.getDefaults(AuthAndTLSOptions.class));
private ChannelOptions(
boolean tlsEnabled,
SslContext sslContext,
String tlsAuthorityOverride,
- CallCredentials credentials,
- int maxMessageSize) {
+ CallCredentials credentials) {
this.tlsEnabled = tlsEnabled;
this.sslContext = sslContext;
this.tlsAuthorityOverride = tlsAuthorityOverride;
this.credentials = credentials;
- this.maxMessageSize = maxMessageSize;
}
public boolean tlsEnabled() {
@@ -69,15 +67,10 @@ public final class ChannelOptions {
return sslContext;
}
- public int maxMessageSize() {
- return maxMessageSize;
- }
-
- public static ChannelOptions create(AuthAndTLSOptions options, int grpcMaxChunkSizeBytes) {
+ public static ChannelOptions create(AuthAndTLSOptions options) {
try {
return create(
options,
- grpcMaxChunkSizeBytes,
options.authCredentials != null
? new FileInputStream(options.authCredentials)
: null);
@@ -88,7 +81,8 @@ public final class ChannelOptions {
}
@VisibleForTesting
- public static ChannelOptions create(AuthAndTLSOptions options, int grpcMaxChunkSizeBytes,
+ public static ChannelOptions create(
+ AuthAndTLSOptions options,
@Nullable InputStream credentialsInputStream) {
boolean tlsEnabled = options.tlsEnabled;
SslContext sslContext = null;
@@ -118,11 +112,7 @@ public final class ChannelOptions {
"Failed initializing auth credentials for remote cache/execution " + e);
}
}
- final int maxMessageSize =
- Math.max(
- 4 * 1024 * 1024 /* GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE */,
- grpcMaxChunkSizeBytes + CHUNK_MESSAGE_OVERHEAD);
return new ChannelOptions(
- tlsEnabled, sslContext, tlsAuthorityOverride, credentials, maxMessageSize);
+ tlsEnabled, sslContext, tlsAuthorityOverride, credentials);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
index a9bdd6dc13..0c407a29b7 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
@@ -14,114 +14,162 @@
package com.google.devtools.build.lib.remote;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
-import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.Path;
-import com.google.protobuf.ByteString;
+import com.google.devtools.remoteexecution.v1test.Digest;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Set;
-import javax.annotation.Nullable;
-/** An iterator-type object that transforms byte sources into a stream of BlobChunk messages. */
+/** An iterator-type object that transforms byte sources into a stream of Chunks. */
public final class Chunker {
+ // This is effectively final, should be changed only in unit-tests!
+ private static int DEFAULT_CHUNK_SIZE = 1024 * 16;
+ private static byte[] EMPTY_BLOB = new byte[0];
+
+ @VisibleForTesting
+ static void setDefaultChunkSizeForTesting(int value) {
+ DEFAULT_CHUNK_SIZE = value;
+ }
+
+ public static int getDefaultChunkSize() {
+ return DEFAULT_CHUNK_SIZE;
+ }
+
+ /** A piece of a byte[] blob. */
+ public static final class Chunk {
+
+ private final Digest digest;
+ private final long offset;
+ // TODO(olaola): consider saving data in a different format that byte[].
+ private final byte[] data;
+
+ @VisibleForTesting
+ public Chunk(Digest digest, byte[] data, long offset) {
+ this.digest = digest;
+ this.data = data;
+ this.offset = offset;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof Chunk)) {
+ return false;
+ }
+ Chunk other = (Chunk) o;
+ return other.offset == offset
+ && other.digest.equals(digest)
+ && Arrays.equals(other.data, data);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(digest, offset, Arrays.hashCode(data));
+ }
+
+ public Digest getDigest() {
+ return digest;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ // This returns a mutable copy, for efficiency.
+ public byte[] getData() {
+ return data;
+ }
+ }
+
/** An Item is an opaque digestable source of bytes. */
interface Item {
- ContentDigest getDigest() throws IOException;
+ Digest getDigest() throws IOException;
InputStream getInputStream() throws IOException;
}
private final Iterator<Item> inputIterator;
private InputStream currentStream;
- private final Set<ContentDigest> digests;
- private ContentDigest digest;
+ private Digest digest;
private long bytesLeft;
private final int chunkSize;
- Chunker(
- Iterator<Item> inputIterator,
- int chunkSize,
- // If present, specifies which digests to output out of the whole input.
- @Nullable Set<ContentDigest> digests)
- throws IOException {
+ Chunker(Iterator<Item> inputIterator, int chunkSize) throws IOException {
Preconditions.checkArgument(chunkSize > 0, "Chunk size must be greater than 0");
- this.digests = digests;
this.inputIterator = inputIterator;
this.chunkSize = chunkSize;
advanceInput();
}
- Chunker(Iterator<Item> inputIterator, int chunkSize) throws IOException {
- this(inputIterator, chunkSize, null);
+ Chunker(Item input, int chunkSize) throws IOException {
+ this(Iterators.singletonIterator(input), chunkSize);
}
- Chunker(Item input, int chunkSize) throws IOException {
- this(Iterators.singletonIterator(input), chunkSize, ImmutableSet.of(input.getDigest()));
- }
-
- private void advanceInput() throws IOException {
- do {
- if (inputIterator != null && inputIterator.hasNext()) {
- Item input = inputIterator.next();
- digest = input.getDigest();
- currentStream = input.getInputStream();
- bytesLeft = digest.getSizeBytes();
- } else {
- digest = null;
- currentStream = null;
- bytesLeft = 0;
- }
- } while (digest != null && digests != null && !digests.contains(digest));
+ public void advanceInput() throws IOException {
+ if (inputIterator != null && inputIterator.hasNext()) {
+ Item input = inputIterator.next();
+ digest = input.getDigest();
+ currentStream = input.getInputStream();
+ bytesLeft = digest.getSizeBytes();
+ } else {
+ digest = null;
+ currentStream = null;
+ bytesLeft = 0;
+ }
}
- /** True if the object has more BlobChunk elements. */
+ /** True if the object has more Chunk elements. */
public boolean hasNext() {
return currentStream != null;
}
- /** Consume the next BlobChunk element. */
- public BlobChunk next() throws IOException {
+ /** Consume the next Chunk element. */
+ public Chunk next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
- BlobChunk.Builder chunk = BlobChunk.newBuilder();
- long offset = digest.getSizeBytes() - bytesLeft;
- if (offset == 0) {
- chunk.setDigest(digest);
- } else {
- chunk.setOffset(offset);
- }
+ final long offset = digest.getSizeBytes() - bytesLeft;
+ byte[] blob = EMPTY_BLOB;
if (bytesLeft > 0) {
- byte[] blob = new byte[(int) Math.min(bytesLeft, chunkSize)];
+ blob = new byte[(int) Math.min(bytesLeft, chunkSize)];
currentStream.read(blob);
- chunk.setData(ByteString.copyFrom(blob));
bytesLeft -= blob.length;
}
+ final Chunk result = new Chunk(digest, blob, offset);
if (bytesLeft == 0) {
currentStream.close();
advanceInput(); // Sets the current stream to null, if it was the last.
}
- return chunk.build();
+ return result;
}
static Item toItem(final byte[] blob) {
return new Item() {
+ Digest digest = null;
+
@Override
- public ContentDigest getDigest() throws IOException {
- return ContentDigests.computeDigest(blob);
+ public Digest getDigest() throws IOException {
+ if (digest == null) {
+ digest = Digests.computeDigest(blob);
+ }
+ return digest;
}
@Override
@@ -133,9 +181,14 @@ public final class Chunker {
static Item toItem(final Path file) {
return new Item() {
+ Digest digest = null;
+
@Override
- public ContentDigest getDigest() throws IOException {
- return ContentDigests.computeDigest(file);
+ public Digest getDigest() throws IOException {
+ if (digest == null) {
+ digest = Digests.computeDigest(file);
+ }
+ return digest;
}
@Override
@@ -152,8 +205,8 @@ public final class Chunker {
}
return new Item() {
@Override
- public ContentDigest getDigest() throws IOException {
- return ContentDigests.getDigestFromInputCache(input, inputCache);
+ public Digest getDigest() throws IOException {
+ return Digests.getDigestFromInputCache(input, inputCache);
}
@Override
@@ -165,9 +218,14 @@ public final class Chunker {
static Item toItem(final VirtualActionInput input) {
return new Item() {
+ Digest digest = null;
+
@Override
- public ContentDigest getDigest() throws IOException {
- return ContentDigests.computeDigest(input);
+ public Digest getDigest() throws IOException {
+ if (digest == null) {
+ digest = Digests.computeDigest(input);
+ }
+ return digest;
}
@Override
@@ -189,27 +247,67 @@ public final class Chunker {
return new Chunker(toItem(input, inputCache, execRoot), chunkSize);
}
+ /**
+ * Create a Chunker from a given ActionInput, taking its digest from the provided
+ * ActionInputFileCache.
+ */
+ public static Chunker from(ActionInput input, ActionInputFileCache inputCache, Path execRoot)
+ throws IOException {
+ return from(input, getDefaultChunkSize(), inputCache, execRoot);
+ }
+
/** Create a Chunker from a given blob and chunkSize. */
public static Chunker from(byte[] blob, int chunkSize) throws IOException {
return new Chunker(toItem(blob), chunkSize);
}
+ /** Create a Chunker from a given blob. */
+ public static Chunker from(byte[] blob) throws IOException {
+ return from(blob, getDefaultChunkSize());
+ }
+
/** Create a Chunker from a given Path and chunkSize. */
public static Chunker from(Path file, int chunkSize) throws IOException {
return new Chunker(toItem(file), chunkSize);
}
+ /** Create a Chunker from a given Path. */
+ public static Chunker from(Path file) throws IOException {
+ return from(file, getDefaultChunkSize());
+ }
+
+ static class MemberOf implements Predicate<Item> {
+ private final Set<Digest> digests;
+
+ public MemberOf(Set<Digest> digests) {
+ this.digests = digests;
+ }
+
+ @Override
+ public boolean apply(Item item) {
+ try {
+ return digests.contains(item.getDigest());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
/**
* Create a Chunker from multiple input sources. The order of the sources provided to the Builder
* will be the same order they will be chunked by.
*/
public static final class Builder {
private final ArrayList<Item> items = new ArrayList<>();
- private Set<ContentDigest> digests = null;
- private int chunkSize = 0;
+ private Set<Digest> digests = null;
+ private int chunkSize = getDefaultChunkSize();
public Chunker build() throws IOException {
- return new Chunker(items.iterator(), chunkSize, digests);
+ return new Chunker(
+ digests == null
+ ? items.iterator()
+ : Iterators.filter(items.iterator(), new MemberOf(digests)),
+ chunkSize);
}
public Builder chunkSize(int chunkSize) {
@@ -221,7 +319,7 @@ public final class Chunker {
* Restricts the Chunker to use only inputs with these digests. This is an optimization for CAS
* uploads where a list of digests missing from the CAS is known.
*/
- public Builder onlyUseDigests(Set<ContentDigest> digests) {
+ public Builder onlyUseDigests(Set<Digest> digests) {
this.digests = digests;
return this;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ContentDigests.java b/src/main/java/com/google/devtools/build/lib/remote/Digests.java
index 21a739c468..9121bc2782 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ContentDigests.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Digests.java
@@ -14,34 +14,35 @@
package com.google.devtools.build.lib.remote;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
import com.google.devtools.build.lib.vfs.Path;
-import com.google.protobuf.ByteString;
+import com.google.devtools.remoteexecution.v1test.Action;
+import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.protobuf.Message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-/** Helper methods relating to computing ContentDigest messages for remote execution. */
+/** Helper methods relating to computing Digest messages for remote execution. */
@ThreadSafe
-public final class ContentDigests {
- private ContentDigests() {}
+public final class Digests {
+ private Digests() {}
- public static ContentDigest computeDigest(byte[] blob) {
- return buildDigest(Hashing.sha1().hashBytes(blob).asBytes(), blob.length);
+ public static Digest computeDigest(byte[] blob) {
+ return buildDigest(Hashing.sha1().hashBytes(blob).toString(), blob.length);
}
- public static ContentDigest computeDigest(Path file) throws IOException {
+ public static Digest computeDigest(Path file) throws IOException {
return buildDigest(file.getSHA1Digest(), file.getFileSize());
}
- public static ContentDigest computeDigest(VirtualActionInput input) throws IOException {
+ public static Digest computeDigest(VirtualActionInput input) throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
input.writeTo(buffer);
return computeDigest(buffer.toByteArray());
@@ -52,22 +53,26 @@ public final class ContentDigests {
* bytes, but this implementation relies on the stability of the proto encoding, in particular
* between different platforms and languages. TODO(olaola): upgrade to a better implementation!
*/
- public static ContentDigest computeDigest(Message message) {
+ public static Digest computeDigest(Message message) {
return computeDigest(message.toByteArray());
}
+ public static Digest computeDigestUtf8(String str) {
+ return computeDigest(str.getBytes(UTF_8));
+ }
+
/**
- * A special type of ContentDigest that is used only as a remote action cache key. This is a
- * separate type in order to prevent accidentally using other ContentDigests as action keys.
+ * A special type of Digest that is used only as a remote action cache key. This is a
+ * separate type in order to prevent accidentally using other Digests as action keys.
*/
public static final class ActionKey {
- private final ContentDigest digest;
+ private final Digest digest;
- public ContentDigest getDigest() {
+ public Digest getDigest() {
return digest;
}
- private ActionKey(ContentDigest digest) {
+ private ActionKey(Digest digest) {
this.digest = digest;
}
}
@@ -77,29 +82,23 @@ public final class ContentDigests {
}
/**
- * Assumes that the given ContentDigest is a valid digest of an Action, and creates an ActionKey
+ * Assumes that the given Digest is a valid digest of an Action, and creates an ActionKey
* wrapper. This should not be called on the client side!
*/
- public static ActionKey unsafeActionKeyFromDigest(ContentDigest digest) {
+ public static ActionKey unsafeActionKeyFromDigest(Digest digest) {
return new ActionKey(digest);
}
- public static ContentDigest buildDigest(byte[] digest, long size) {
- ContentDigest.Builder b = ContentDigest.newBuilder();
- b.setDigest(ByteString.copyFrom(digest)).setSizeBytes(size);
- return b.build();
+ public static Digest buildDigest(byte[] hash, long size) {
+ return buildDigest(HashCode.fromBytes(hash).toString(), size);
}
- public static ContentDigest getDigestFromInputCache(ActionInput input, ActionInputFileCache cache)
- throws IOException {
- return buildDigest(cache.getDigest(input), cache.getSizeInBytes(input));
+ public static Digest buildDigest(String hexHash, long size) {
+ return Digest.newBuilder().setHash(hexHash).setSizeBytes(size).build();
}
- public static String toHexString(ContentDigest digest) {
- return HashCode.fromBytes(digest.getDigest().toByteArray()).toString();
- }
-
- public static String toString(ContentDigest digest) {
- return "<digest: " + toHexString(digest) + ", size: " + digest.getSizeBytes() + " bytes>";
+ public static Digest getDigestFromInputCache(ActionInput input, ActionInputFileCache cache)
+ throws IOException {
+ return buildDigest(cache.getDigest(input), cache.getSizeInBytes(input));
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
index d85125f15c..d93842244d 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
@@ -14,52 +14,53 @@
package com.google.devtools.build.lib.remote;
+import com.google.bytestream.ByteStreamGrpc;
+import com.google.bytestream.ByteStreamGrpc.ByteStreamBlockingStub;
+import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
+import com.google.bytestream.ByteStreamProto.ReadRequest;
+import com.google.bytestream.ByteStreamProto.ReadResponse;
+import com.google.bytestream.ByteStreamProto.WriteRequest;
+import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileMetadata;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Output;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Output.ContentCase;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
-import com.google.devtools.build.lib.util.Pair;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc;
+import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheBlockingStub;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsRequest;
+import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsResponse;
+import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc;
+import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageBlockingStub;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.Directory;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
+import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
+import com.google.devtools.remoteexecution.v1test.OutputDirectory;
+import com.google.devtools.remoteexecution.v1test.OutputFile;
+import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest;
import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
+import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -67,50 +68,76 @@ import java.util.concurrent.atomic.AtomicReference;
/** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */
@ThreadSafe
public class GrpcActionCache implements RemoteActionCache {
- private static final int MAX_MEMORY_KBYTES = 512 * 1024;
-
- /** Channel over which to send gRPC CAS queries. */
- private final GrpcCasInterface casIface;
-
- private final GrpcExecutionCacheInterface iface;
private final RemoteOptions options;
-
- public GrpcActionCache(
- RemoteOptions options, GrpcCasInterface casIface, GrpcExecutionCacheInterface iface) {
- this.options = options;
- this.casIface = casIface;
- this.iface = iface;
- }
+ private final ChannelOptions channelOptions;
+ private final Channel channel;
@VisibleForTesting
- public GrpcActionCache(
- Channel channel, RemoteOptions options, ChannelOptions channelOptions) {
+ public GrpcActionCache(Channel channel, ChannelOptions channelOptions, RemoteOptions options) {
this.options = options;
- this.casIface =
- GrpcInterfaces.casInterface(options.remoteTimeout, channel, channelOptions);
- this.iface =
- GrpcInterfaces.executionCacheInterface(options.remoteTimeout, channel, channelOptions);
+ this.channelOptions = channelOptions;
+ this.channel = channel;
}
- public GrpcActionCache(RemoteOptions options, ChannelOptions channelOptions) {
- this(RemoteUtils.createChannel(options.remoteCache, channelOptions), options, channelOptions);
- }
+ // All gRPC stubs are reused.
+ private final Supplier<ContentAddressableStorageBlockingStub> casBlockingStub =
+ Suppliers.memoize(
+ new Supplier<ContentAddressableStorageBlockingStub>() {
+ @Override
+ public ContentAddressableStorageBlockingStub get() {
+ return ContentAddressableStorageGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+ });
+
+ private final Supplier<ByteStreamBlockingStub> bsBlockingStub =
+ Suppliers.memoize(
+ new Supplier<ByteStreamBlockingStub>() {
+ @Override
+ public ByteStreamBlockingStub get() {
+ return ByteStreamGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+ });
+
+ private final Supplier<ByteStreamStub> bsStub =
+ Suppliers.memoize(
+ new Supplier<ByteStreamStub>() {
+ @Override
+ public ByteStreamStub get() {
+ return ByteStreamGrpc.newStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+ });
+
+ private final Supplier<ActionCacheBlockingStub> acBlockingStub =
+ Suppliers.memoize(
+ new Supplier<ActionCacheBlockingStub>() {
+ @Override
+ public ActionCacheBlockingStub get() {
+ return ActionCacheGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+ });
public static boolean isRemoteCacheOptions(RemoteOptions options) {
return options.remoteCache != null;
}
- private ImmutableSet<ContentDigest> getMissingDigests(Iterable<ContentDigest> digests) {
- CasLookupRequest.Builder request = CasLookupRequest.newBuilder().addAllDigest(digests);
- if (request.getDigestCount() == 0) {
+ private ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests) {
+ FindMissingBlobsRequest.Builder request =
+ FindMissingBlobsRequest.newBuilder()
+ .setInstanceName(options.remoteInstanceName)
+ .addAllBlobDigests(digests);
+ if (request.getBlobDigestsCount() == 0) {
return ImmutableSet.of();
}
- CasStatus status = casIface.lookup(request.build()).getStatus();
- if (!status.getSucceeded() && status.getError() != CasStatus.ErrorCode.MISSING_DIGEST) {
- // TODO(olaola): here and below, add basic retry logic on transient errors!
- throw new RuntimeException(status.getErrorDetail());
- }
- return ImmutableSet.copyOf(status.getMissingDigestList());
+ FindMissingBlobsResponse response = casBlockingStub.get().findMissingBlobs(request.build());
+ return ImmutableSet.copyOf(response.getMissingBlobDigestsList());
}
/**
@@ -122,26 +149,37 @@ public class GrpcActionCache implements RemoteActionCache {
throws IOException, InterruptedException {
repository.computeMerkleDigests(root);
// TODO(olaola): avoid querying all the digests, only ask for novel subtrees.
- ImmutableSet<ContentDigest> missingDigests = getMissingDigests(repository.getAllDigests(root));
+ ImmutableSet<Digest> missingDigests = getMissingDigests(repository.getAllDigests(root));
// Only upload data that was missing from the cache.
ArrayList<ActionInput> actionInputs = new ArrayList<>();
- ArrayList<FileNode> treeNodes = new ArrayList<>();
+ ArrayList<Directory> treeNodes = new ArrayList<>();
repository.getDataFromDigests(missingDigests, actionInputs, treeNodes);
if (!treeNodes.isEmpty()) {
- CasUploadTreeMetadataRequest.Builder metaRequest =
- CasUploadTreeMetadataRequest.newBuilder().addAllTreeNode(treeNodes);
- CasUploadTreeMetadataReply reply = casIface.uploadTreeMetadata(metaRequest.build());
- if (!reply.getStatus().getSucceeded()) {
- throw new RuntimeException(reply.getStatus().getErrorDetail());
+ // TODO(olaola): split this into multiple requests if total size is > 10MB.
+ BatchUpdateBlobsRequest.Builder treeBlobRequest =
+ BatchUpdateBlobsRequest.newBuilder().setInstanceName(options.remoteInstanceName);
+ for (Directory d : treeNodes) {
+ final byte[] data = d.toByteArray();
+ treeBlobRequest
+ .addRequestsBuilder()
+ .setContentDigest(Digests.computeDigest(data))
+ .setData(ByteString.copyFrom(data));
+ }
+ BatchUpdateBlobsResponse response =
+ casBlockingStub.get().batchUpdateBlobs(treeBlobRequest.build());
+ // TODO(olaola): handle retries on transient errors.
+ for (BatchUpdateBlobsResponse.Response r : response.getResponsesList()) {
+ if (!Status.fromCodeValue(r.getStatus().getCode()).isOk()) {
+ throw StatusProto.toStatusRuntimeException(r.getStatus());
+ }
}
}
if (!actionInputs.isEmpty()) {
uploadChunks(
actionInputs.size(),
new Chunker.Builder()
- .chunkSize(options.grpcMaxChunkSizeBytes)
.addAllInputs(actionInputs, repository.getInputFileCache(), execRoot)
.onlyUseDigests(missingDigests)
.build());
@@ -152,21 +190,11 @@ public class GrpcActionCache implements RemoteActionCache {
* Download the entire tree data rooted by the given digest and write it into the given location.
*/
@Override
- public void downloadTree(ContentDigest rootDigest, Path rootLocation)
+ public void downloadTree(Digest rootDigest, Path rootLocation)
throws IOException, CacheNotFoundException {
throw new UnsupportedOperationException();
}
- private void handleDownloadStatus(CasStatus status) throws CacheNotFoundException {
- if (!status.getSucceeded()) {
- if (status.getError() == CasStatus.ErrorCode.MISSING_DIGEST) {
- throw new CacheNotFoundException(status.getMissingDigest(0));
- }
- // TODO(olaola): deal with other statuses better.
- throw new RuntimeException(status.getErrorDetail());
- }
- }
-
/**
* Download all results of a remotely executed action locally. TODO(olaola): will need to amend to
* include the {@link com.google.devtools.build.lib.remote.TreeNodeRepository} for updating.
@@ -174,106 +202,85 @@ public class GrpcActionCache implements RemoteActionCache {
@Override
public void downloadAllResults(ActionResult result, Path execRoot)
throws IOException, CacheNotFoundException {
- if (result.getOutputList().isEmpty()) {
+ if (result.getOutputFilesList().isEmpty() && result.getOutputDirectoriesList().isEmpty()) {
return;
}
- // Send all the file requests in a single synchronous batch.
- // TODO(olaola): profile to maybe replace with separate concurrent requests.
- CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder();
- Map<ContentDigest, Pair<Path, FileMetadata>> metadataMap = new HashMap<>();
- for (Output output : result.getOutputList()) {
- Path path = execRoot.getRelative(output.getPath());
- if (output.getContentCase() == ContentCase.FILE_METADATA) {
- FileMetadata fileMetadata = output.getFileMetadata();
- ContentDigest digest = fileMetadata.getDigest();
- if (digest.getSizeBytes() > 0) {
- request.addDigest(digest);
- metadataMap.put(digest, Pair.of(path, fileMetadata));
- } else {
- // Handle empty file locally.
- FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
- FileSystemUtils.writeContent(path, new byte[0]);
- }
+ for (OutputFile file : result.getOutputFilesList()) {
+ Path path = execRoot.getRelative(file.getPath());
+ FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
+ Digest digest = file.getDigest();
+ if (digest.getSizeBytes() == 0) {
+ // Handle empty file locally.
+ FileSystemUtils.writeContent(path, new byte[0]);
} else {
- downloadTree(output.getDigest(), path);
+ try (OutputStream stream = path.getOutputStream()) {
+ if (!file.getContent().isEmpty()) {
+ file.getContent().writeTo(stream);
+ } else {
+ Iterator<ReadResponse> replies = readBlob(digest);
+ while (replies.hasNext()) {
+ replies.next().getData().writeTo(stream);
+ }
+ }
+ }
}
+ path.setExecutable(file.getIsExecutable());
}
- Iterator<CasDownloadReply> replies = casIface.downloadBlob(request.build());
- Set<ContentDigest> results = new HashSet<>();
- while (replies.hasNext()) {
- results.add(createFileFromStream(metadataMap, replies));
- }
- for (ContentDigest digest : metadataMap.keySet()) {
- if (!results.contains(digest)) {
- throw new CacheNotFoundException(digest);
- }
+ for (OutputDirectory directory : result.getOutputDirectoriesList()) {
+ downloadTree(directory.getDigest(), execRoot.getRelative(directory.getPath()));
}
}
- private ContentDigest createFileFromStream(
- Map<ContentDigest, Pair<Path, FileMetadata>> metadataMap, Iterator<CasDownloadReply> replies)
- throws IOException, CacheNotFoundException {
- Preconditions.checkArgument(replies.hasNext());
- CasDownloadReply reply = replies.next();
- if (reply.hasStatus()) {
- handleDownloadStatus(reply.getStatus());
+ private Iterator<ReadResponse> readBlob(Digest digest) throws CacheNotFoundException {
+ String resourceName = "";
+ if (!options.remoteInstanceName.isEmpty()) {
+ resourceName += options.remoteInstanceName + "/";
}
- BlobChunk chunk = reply.getData();
- ContentDigest digest = chunk.getDigest();
- Preconditions.checkArgument(metadataMap.containsKey(digest));
- Pair<Path, FileMetadata> metadata = metadataMap.get(digest);
- Path path = metadata.first;
- FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
- try (OutputStream stream = path.getOutputStream()) {
- ByteString data = chunk.getData();
- data.writeTo(stream);
- long bytesLeft = digest.getSizeBytes() - data.size();
- while (bytesLeft > 0) {
- Preconditions.checkArgument(replies.hasNext());
- reply = replies.next();
- if (reply.hasStatus()) {
- handleDownloadStatus(reply.getStatus());
- }
- chunk = reply.getData();
- data = chunk.getData();
- Preconditions.checkArgument(!chunk.hasDigest());
- Preconditions.checkArgument(chunk.getOffset() == digest.getSizeBytes() - bytesLeft);
- data.writeTo(stream);
- bytesLeft -= data.size();
+ resourceName += "blobs/" + digest.getHash() + "/" + digest.getSizeBytes();
+ try {
+ return bsBlockingStub
+ .get()
+ .read(ReadRequest.newBuilder().setResourceName(resourceName).build());
+ } catch (StatusRuntimeException e) {
+ if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
+ throw new CacheNotFoundException(digest);
}
- path.setExecutable(metadata.second.getExecutable());
+ throw e;
}
- return digest;
}
/** Upload all results of a locally executed action to the cache. */
@Override
public void uploadAllResults(Path execRoot, Collection<Path> files, ActionResult.Builder result)
throws IOException, InterruptedException {
- ArrayList<ContentDigest> digests = new ArrayList<>();
- Chunker.Builder b = new Chunker.Builder().chunkSize(options.grpcMaxChunkSizeBytes);
+ ArrayList<Digest> digests = new ArrayList<>();
+ Chunker.Builder b = new Chunker.Builder();
for (Path file : files) {
- digests.add(ContentDigests.computeDigest(file));
+ if (!file.exists()) {
+ // We ignore requested results that have not been generated by the action.
+ continue;
+ }
+ if (file.isDirectory()) {
+ // TODO(olaola): to implement this for a directory, will need to create or pass a
+ // TreeNodeRepository to call uploadTree.
+ throw new UnsupportedOperationException("Storing a directory is not yet supported.");
+ }
+ digests.add(Digests.computeDigest(file));
b.addInput(file);
}
- ImmutableSet<ContentDigest> missing = getMissingDigests(digests);
+ ImmutableSet<Digest> missing = getMissingDigests(digests);
if (!missing.isEmpty()) {
uploadChunks(missing.size(), b.onlyUseDigests(missing).build());
}
int index = 0;
for (Path file : files) {
- if (file.isDirectory()) {
- // TODO(olaola): to implement this for a directory, will need to create or pass a
- // TreeNodeRepository to call uploadTree.
- throw new UnsupportedOperationException("Storing a directory is not yet supported.");
- }
// Add to protobuf.
+ // TODO(olaola): inline small results here.
result
- .addOutputBuilder()
+ .addOutputFilesBuilder()
.setPath(file.relativeTo(execRoot).getPathString())
- .getFileMetadataBuilder()
.setDigest(digests.get(index++))
- .setExecutable(file.isExecutable());
+ .setIsExecutable(file.isExecutable());
}
}
@@ -284,11 +291,11 @@ public class GrpcActionCache implements RemoteActionCache {
* @return The key for fetching the file contents blob from cache.
*/
@Override
- public ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException {
- ContentDigest digest = ContentDigests.computeDigest(file);
- ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest));
+ public Digest uploadFileContents(Path file) throws IOException, InterruptedException {
+ Digest digest = Digests.computeDigest(file);
+ ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
- uploadChunks(1, Chunker.from(file, options.grpcMaxChunkSizeBytes));
+ uploadChunks(1, Chunker.from(file));
}
return digest;
}
@@ -300,97 +307,84 @@ public class GrpcActionCache implements RemoteActionCache {
* @return The key for fetching the file contents blob from cache.
*/
@Override
- public ContentDigest uploadFileContents(
+ public Digest uploadFileContents(
ActionInput input, Path execRoot, ActionInputFileCache inputCache)
throws IOException, InterruptedException {
- ContentDigest digest = ContentDigests.getDigestFromInputCache(input, inputCache);
- ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest));
+ Digest digest = Digests.getDigestFromInputCache(input, inputCache);
+ ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
- uploadChunks(1, Chunker.from(input, options.grpcMaxChunkSizeBytes, inputCache, execRoot));
+ uploadChunks(1, Chunker.from(input, inputCache, execRoot));
}
return digest;
}
- static class UploadBlobReplyStreamObserver implements StreamObserver<CasUploadBlobReply> {
- private final CountDownLatch finishLatch;
- private final AtomicReference<RuntimeException> exception;
-
- public UploadBlobReplyStreamObserver(
- CountDownLatch finishLatch, AtomicReference<RuntimeException> exception) {
- this.finishLatch = finishLatch;
- this.exception = exception;
- }
-
- @Override
- public void onNext(CasUploadBlobReply reply) {
- if (!reply.getStatus().getSucceeded()) {
- // TODO(olaola): add basic retry logic on transient errors!
- this.exception.compareAndSet(
- null, new RuntimeException(reply.getStatus().getErrorDetail()));
+ private void uploadChunks(int numItems, Chunker chunker)
+ throws InterruptedException, IOException {
+ final CountDownLatch finishLatch = new CountDownLatch(numItems);
+ final AtomicReference<RuntimeException> exception = new AtomicReference<>(null);
+ StreamObserver<WriteRequest> requestObserver = null;
+ String resourceName = "";
+ if (!options.remoteInstanceName.isEmpty()) {
+ resourceName += options.remoteInstanceName + "/";
+ }
+ while (chunker.hasNext()) {
+ Chunker.Chunk chunk = chunker.next();
+ final Digest digest = chunk.getDigest();
+ long offset = chunk.getOffset();
+ WriteRequest.Builder request = WriteRequest.newBuilder();
+ if (offset == 0) { // Beginning of new upload.
+ numItems--;
+ request.setResourceName(
+ resourceName
+ + "uploads/"
+ + UUID.randomUUID()
+ + "/blobs/"
+ + digest.getHash()
+ + "/"
+ + digest.getSizeBytes());
+ // The batches execute simultaneously.
+ requestObserver =
+ bsStub
+ .get()
+ .write(
+ new StreamObserver<WriteResponse>() {
+ private long bytesLeft = digest.getSizeBytes();
+
+ @Override
+ public void onNext(WriteResponse reply) {
+ bytesLeft -= reply.getCommittedSize();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ exception.compareAndSet(
+ null, new StatusRuntimeException(Status.fromThrowable(t)));
+ finishLatch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ if (bytesLeft != 0) {
+ exception.compareAndSet(
+ null, new RuntimeException("Server did not commit all data."));
+ }
+ finishLatch.countDown();
+ }
+ });
}
- }
-
- @Override
- public void onError(Throwable t) {
- this.exception.compareAndSet(null, new StatusRuntimeException(Status.fromThrowable(t)));
- finishLatch.countDown();
- }
-
- @Override
- public void onCompleted() {
- finishLatch.countDown();
- }
- }
-
- private void uploadChunks(int numItems, Chunker blobs) throws InterruptedException, IOException {
- CountDownLatch finishLatch = new CountDownLatch(numItems); // Maximal number of batches.
- AtomicReference<RuntimeException> exception = new AtomicReference<>(null);
- UploadBlobReplyStreamObserver responseObserver = null;
- StreamObserver<CasUploadBlobRequest> requestObserver = null;
- int currentBatchBytes = 0;
- int batchedInputs = 0;
- int batches = 0;
- try {
- while (blobs.hasNext()) {
- BlobChunk chunk = blobs.next();
- if (chunk.hasDigest()) {
- // Determine whether to start next batch.
- final long batchSize = chunk.getDigest().getSizeBytes() + currentBatchBytes;
- if (batchedInputs % options.grpcMaxBatchInputs == 0
- || batchSize > options.grpcMaxBatchSizeBytes) {
- // The batches execute simultaneously.
- if (requestObserver != null) {
- batchedInputs = 0;
- currentBatchBytes = 0;
- requestObserver.onCompleted();
- }
- batches++;
- responseObserver = new UploadBlobReplyStreamObserver(finishLatch, exception);
- requestObserver = casIface.uploadBlobAsync(responseObserver);
- }
- batchedInputs++;
- }
- currentBatchBytes += chunk.getData().size();
- requestObserver.onNext(CasUploadBlobRequest.newBuilder().setData(chunk).build());
- if (finishLatch.getCount() == 0) {
- // RPC completed or errored before we finished sending.
- throw new RuntimeException(
- "gRPC terminated prematurely: "
- + (exception.get() != null ? exception.get() : "unknown cause"));
- }
+ byte[] data = chunk.getData();
+ boolean finishWrite = offset + data.length == digest.getSizeBytes();
+ request.setData(ByteString.copyFrom(data)).setWriteOffset(offset).setFinishWrite(finishWrite);
+ requestObserver.onNext(request.build());
+ if (finishWrite) {
+ requestObserver.onCompleted();
}
- } catch (RuntimeException e) {
- // Cancel RPC
- if (requestObserver != null) {
- requestObserver.onError(e);
+ if (finishLatch.getCount() <= numItems) {
+ // Current RPC errored before we finished sending.
+ if (!finishWrite) {
+ chunker.advanceInput();
+ }
}
- throw e;
- }
- if (requestObserver != null) {
- requestObserver.onCompleted(); // Finish last batch.
- }
- while (batches++ < numItems) {
- finishLatch.countDown(); // Non-sent batches.
}
finishLatch.await(options.remoteTimeout, TimeUnit.SECONDS);
if (exception.get() != null) {
@@ -399,33 +393,12 @@ public class GrpcActionCache implements RemoteActionCache {
}
@Override
- public ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs)
- throws InterruptedException {
- ArrayList<ContentDigest> digests = new ArrayList<>();
- Chunker.Builder b = new Chunker.Builder().chunkSize(options.grpcMaxChunkSizeBytes);
- for (byte[] blob : blobs) {
- digests.add(ContentDigests.computeDigest(blob));
- b.addInput(blob);
- }
- ImmutableSet<ContentDigest> missing = getMissingDigests(digests);
+ public Digest uploadBlob(byte[] blob) throws InterruptedException {
+ Digest digest = Digests.computeDigest(blob);
+ ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
try {
if (!missing.isEmpty()) {
- uploadChunks(missing.size(), b.onlyUseDigests(missing).build());
- }
- return ImmutableList.copyOf(digests);
- } catch (IOException e) {
- // This will never happen.
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public ContentDigest uploadBlob(byte[] blob) throws InterruptedException {
- ContentDigest digest = ContentDigests.computeDigest(blob);
- ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest));
- try {
- if (!missing.isEmpty()) {
- uploadChunks(1, Chunker.from(blob, options.grpcMaxChunkSizeBytes));
+ uploadChunks(1, Chunker.from(blob));
}
return digest;
} catch (IOException e) {
@@ -435,69 +408,20 @@ public class GrpcActionCache implements RemoteActionCache {
}
@Override
- public byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException {
- return downloadBlobs(ImmutableList.of(digest)).get(0);
- }
-
- @Override
- public ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests)
- throws CacheNotFoundException {
- // Send all the file requests in a single synchronous batch.
- // TODO(olaola): profile to maybe replace with separate concurrent requests.
- CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder();
- for (ContentDigest digest : digests) {
- if (digest.getSizeBytes() > 0) {
- request.addDigest(digest); // We handle empty blobs locally.
- }
+ public byte[] downloadBlob(Digest digest) throws CacheNotFoundException {
+ if (digest.getSizeBytes() == 0) {
+ return new byte[0];
}
- Iterator<CasDownloadReply> replies = null;
- Map<ContentDigest, byte[]> results = new HashMap<>();
- int digestCount = request.getDigestCount();
- if (digestCount > 0) {
- replies = casIface.downloadBlob(request.build());
- while (digestCount-- > 0) {
- Preconditions.checkArgument(replies.hasNext());
- CasDownloadReply reply = replies.next();
- if (reply.hasStatus()) {
- handleDownloadStatus(reply.getStatus());
- }
- BlobChunk chunk = reply.getData();
- ContentDigest digest = chunk.getDigest();
- // This is not enough, but better than nothing.
- Preconditions.checkArgument(digest.getSizeBytes() / 1000.0 < MAX_MEMORY_KBYTES);
- byte[] result = new byte[(int) digest.getSizeBytes()];
- ByteString data = chunk.getData();
- data.copyTo(result, 0);
- int offset = data.size();
- while (offset < result.length) {
- Preconditions.checkArgument(replies.hasNext());
- reply = replies.next();
- if (reply.hasStatus()) {
- handleDownloadStatus(reply.getStatus());
- }
- chunk = reply.getData();
- Preconditions.checkArgument(!chunk.hasDigest());
- Preconditions.checkArgument(chunk.getOffset() == offset);
- data = chunk.getData();
- data.copyTo(result, offset);
- offset += data.size();
- }
- results.put(digest, result);
- }
- }
-
- ArrayList<byte[]> result = new ArrayList<>();
- for (ContentDigest digest : digests) {
- if (digest.getSizeBytes() == 0) {
- result.add(new byte[0]);
- continue;
- }
- if (!results.containsKey(digest)) {
- throw new CacheNotFoundException(digest);
- }
- result.add(results.get(digest));
+ Iterator<ReadResponse> replies = readBlob(digest);
+ byte[] result = new byte[(int) digest.getSizeBytes()];
+ int offset = 0;
+ while (replies.hasNext()) {
+ ByteString data = replies.next().getData();
+ data.copyTo(result, offset);
+ offset += data.size();
}
- return ImmutableList.copyOf(result);
+ Preconditions.checkState(digest.getSizeBytes() == offset);
+ return result;
}
// Execution Cache API
@@ -505,30 +429,39 @@ public class GrpcActionCache implements RemoteActionCache {
/** Returns a cached result for a given Action digest, or null if not found in cache. */
@Override
public ActionResult getCachedActionResult(ActionKey actionKey) {
- ExecutionCacheRequest request =
- ExecutionCacheRequest.newBuilder().setActionDigest(actionKey.getDigest()).build();
- ExecutionCacheReply reply = iface.getCachedResult(request);
- ExecutionCacheStatus status = reply.getStatus();
- if (!status.getSucceeded()
- && status.getError() != ExecutionCacheStatus.ErrorCode.MISSING_RESULT) {
- throw new RuntimeException(status.getErrorDetail());
+ try {
+ return acBlockingStub
+ .get()
+ .getActionResult(
+ GetActionResultRequest.newBuilder()
+ .setInstanceName(options.remoteInstanceName)
+ .setActionDigest(actionKey.getDigest())
+ .build());
+ } catch (StatusRuntimeException e) {
+ if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
+ return null;
+ }
+ throw e;
}
- return reply.hasResult() ? reply.getResult() : null;
}
/** Sets the given result as result of the given Action. */
@Override
public void setCachedActionResult(ActionKey actionKey, ActionResult result)
throws InterruptedException {
- ExecutionCacheSetRequest request =
- ExecutionCacheSetRequest.newBuilder()
- .setActionDigest(actionKey.getDigest())
- .setResult(result)
- .build();
- ExecutionCacheSetReply reply = iface.setCachedResult(request);
- ExecutionCacheStatus status = reply.getStatus();
- if (!status.getSucceeded() && status.getError() != ExecutionCacheStatus.ErrorCode.UNSUPPORTED) {
- throw new RuntimeException(status.getErrorDetail());
+ try {
+ acBlockingStub
+ .get()
+ .updateActionResult(
+ UpdateActionResultRequest.newBuilder()
+ .setInstanceName(options.remoteInstanceName)
+ .setActionDigest(actionKey.getDigest())
+ .setActionResult(result)
+ .build());
+ } catch (StatusRuntimeException e) {
+ if (e.getStatus().getCode() != Status.Code.UNIMPLEMENTED) {
+ throw e;
+ }
}
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java
deleted file mode 100644
index 529ff9c3db..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java
+++ /dev/null
@@ -1,43 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote;
-
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub;
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
-import io.grpc.stub.StreamObserver;
-import java.util.Iterator;
-
-/**
- * An abstraction layer between the remote execution client and gRPC to support unit testing. This
- * interface covers the CAS RPC methods, see {@link CasServiceBlockingStub} and
- * {@link CasServiceStub}.
- */
-public interface GrpcCasInterface {
- CasLookupReply lookup(CasLookupRequest request);
- CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request);
- CasDownloadTreeMetadataReply downloadTreeMetadata(CasDownloadTreeMetadataRequest request);
- Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request);
- StreamObserver<CasUploadBlobRequest> uploadBlobAsync(
- StreamObserver<CasUploadBlobReply> responseObserver);
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java
deleted file mode 100644
index 375c2ab359..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote;
-
-import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
-
-/**
- * An abstraction layer between the remote execution client and gRPC to support unit testing. This
- * interface covers the execution cache RPC methods, see {@link ExecutionCacheServiceBlockingStub}.
- */
-public interface GrpcExecutionCacheInterface {
- ExecutionCacheReply getCachedResult(ExecutionCacheRequest request);
- ExecutionCacheSetReply setCachedResult(ExecutionCacheSetRequest request);
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java
deleted file mode 100644
index fd44792b4a..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote;
-
-import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import java.util.Iterator;
-
-/**
- * An abstraction layer between the remote execution client and gRPC to support unit testing. This
- * interface covers the remote execution RPC methods, see {@link ExecuteServiceBlockingStub}.
- */
-public interface GrpcExecutionInterface {
- Iterator<ExecuteReply> execute(ExecuteRequest request);
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java
deleted file mode 100644
index 6e100ae1d0..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java
+++ /dev/null
@@ -1,131 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote;
-
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub;
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub;
-import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub;
-import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
-import io.grpc.Channel;
-import io.grpc.stub.StreamObserver;
-import java.util.Iterator;
-import java.util.concurrent.TimeUnit;
-
-/** Implementations of the gRPC interfaces that actually talk to gRPC. */
-public class GrpcInterfaces {
- /** Create a {@link GrpcCasInterface} instance that actually talks to gRPC. */
- public static GrpcCasInterface casInterface(
- final int grpcTimeoutSeconds,
- final Channel channel,
- final ChannelOptions channelOptions) {
- return new GrpcCasInterface() {
- private CasServiceBlockingStub getCasServiceBlockingStub() {
- return CasServiceGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
- }
-
- private CasServiceStub getCasServiceStub() {
- return CasServiceGrpc.newStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
- }
-
- @Override
- public CasLookupReply lookup(CasLookupRequest request) {
- return getCasServiceBlockingStub().lookup(request);
- }
-
- @Override
- public CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request) {
- return getCasServiceBlockingStub().uploadTreeMetadata(request);
- }
-
- @Override
- public CasDownloadTreeMetadataReply downloadTreeMetadata(
- CasDownloadTreeMetadataRequest request) {
- return getCasServiceBlockingStub().downloadTreeMetadata(request);
- }
-
- @Override
- public Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request) {
- return getCasServiceBlockingStub().downloadBlob(request);
- }
-
- @Override
- public StreamObserver<CasUploadBlobRequest> uploadBlobAsync(
- StreamObserver<CasUploadBlobReply> responseObserver) {
- return getCasServiceStub().uploadBlob(responseObserver);
- }
- };
- }
-
- /** Create a {@link GrpcCasInterface} instance that actually talks to gRPC. */
- public static GrpcExecutionCacheInterface executionCacheInterface(
- final int grpcTimeoutSeconds,
- final Channel channel,
- final ChannelOptions channelOptions) {
- return new GrpcExecutionCacheInterface() {
- private ExecutionCacheServiceBlockingStub getExecutionCacheServiceBlockingStub() {
- return ExecutionCacheServiceGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
- }
-
- @Override
- public ExecutionCacheReply getCachedResult(ExecutionCacheRequest request) {
- return getExecutionCacheServiceBlockingStub().getCachedResult(request);
- }
-
- @Override
- public ExecutionCacheSetReply setCachedResult(ExecutionCacheSetRequest request) {
- return getExecutionCacheServiceBlockingStub().setCachedResult(request);
- }
- };
- }
-
- /** Create a {@link GrpcExecutionInterface} instance that actually talks to gRPC. */
- public static GrpcExecutionInterface executionInterface(
- final int grpcTimeoutSeconds,
- final Channel channel,
- final ChannelOptions channelOptions) {
- return new GrpcExecutionInterface() {
- @Override
- public Iterator<ExecuteReply> execute(ExecuteRequest request) {
- ExecuteServiceBlockingStub stub =
- ExecuteServiceGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(
- grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS);
- return stub.execute(request);
- }
- };
- }
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
index eb79c91919..1950a724c3 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
@@ -15,58 +15,54 @@
package com.google.devtools.build.lib.remote;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
-import io.grpc.ManagedChannel;
-import java.util.Iterator;
+import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
+import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
+import com.google.devtools.remoteexecution.v1test.ExecutionGrpc;
+import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionBlockingStub;
+import com.google.longrunning.Operation;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.util.Durations;
+import io.grpc.Channel;
+import io.grpc.protobuf.StatusProto;
+import java.util.concurrent.TimeUnit;
/** A remote work executor that uses gRPC for communicating the work, inputs and outputs. */
@ThreadSafe
-public class GrpcRemoteExecutor extends GrpcActionCache {
+public class GrpcRemoteExecutor {
+ private final RemoteOptions options;
+ private final ChannelOptions channelOptions;
+ private final Channel channel;
+
public static boolean isRemoteExecutionOptions(RemoteOptions options) {
return options.remoteExecutor != null;
}
- private final GrpcExecutionInterface executionIface;
-
- public GrpcRemoteExecutor(
- RemoteOptions options,
- GrpcCasInterface casIface,
- GrpcExecutionCacheInterface cacheIface,
- GrpcExecutionInterface executionIface) {
- super(options, casIface, cacheIface);
- this.executionIface = executionIface;
- }
-
- public GrpcRemoteExecutor(
- ManagedChannel channel, ChannelOptions channelOptions, RemoteOptions options) {
- super(
- options,
- GrpcInterfaces.casInterface(options.remoteTimeout, channel, channelOptions),
- GrpcInterfaces.executionCacheInterface(
- options.remoteTimeout, channel, channelOptions));
- this.executionIface =
- GrpcInterfaces.executionInterface(options.remoteTimeout, channel, channelOptions);
+ public GrpcRemoteExecutor(Channel channel, ChannelOptions channelOptions, RemoteOptions options) {
+ this.options = options;
+ this.channelOptions = channelOptions;
+ this.channel = channel;
}
- public ExecuteReply executeRemotely(ExecuteRequest request) {
- Iterator<ExecuteReply> replies = executionIface.execute(request);
- ExecuteReply reply = null;
- while (replies.hasNext()) {
- reply = replies.next();
- // We can handle the action execution progress here.
+ public ExecuteResponse executeRemotely(ExecuteRequest request) {
+ // TODO(olaola): handle longrunning Operations by using the Watcher API to wait for results.
+ // For now, only support actions with wait_for_completion = true.
+ Preconditions.checkArgument(request.getWaitForCompletion());
+ int actionSeconds = (int) Durations.toSeconds(request.getAction().getTimeout());
+ ExecutionBlockingStub stub =
+ ExecutionGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout + actionSeconds, TimeUnit.SECONDS);
+ Operation op = stub.execute(request);
+ Preconditions.checkState(op.getDone());
+ Preconditions.checkState(op.getResultCase() != Operation.ResultCase.RESULT_NOT_SET);
+ if (op.getResultCase() == Operation.ResultCase.ERROR) {
+ throw StatusProto.toStatusRuntimeException(op.getError());
}
- if (reply == null) {
- return ExecuteReply.newBuilder()
- .setStatus(
- ExecutionStatus.newBuilder()
- .setExecuted(false)
- .setSucceeded(false)
- .setError(ExecutionStatus.ErrorCode.UNKNOWN_ERROR)
- .setErrorDetail("Remote server terminated the connection"))
- .build();
+ try {
+ return op.getResponse().unpack(ExecuteResponse.class);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
}
- return reply;
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/README.md b/src/main/java/com/google/devtools/build/lib/remote/README.md
index 4c85707e71..2525240b93 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/README.md
+++ b/src/main/java/com/google/devtools/build/lib/remote/README.md
@@ -13,7 +13,7 @@ When it's configured to use a remote cache, Bazel computes a hash for each actio
Starting at version 0.5.0, Bazel supports two caching protocols:
1. a HTTP-based REST protocol
-2. [a gRPC-based protocol](https://github.com/bazelbuild/bazel/blob/master/src/main/protobuf/remote_protocol.proto)
+2. [a gRPC-based protocol](https://github.com/googleapis/googleapis/blob/master/google/devtools/remoteexecution/v1test/remote_execution.proto)
## Remote caching using the HTTP REST protocol
@@ -101,7 +101,7 @@ this directory to include security control.
## Remote caching using the gRPC protocol
-We're working on a [gRPC protocol](https://github.com/bazelbuild/bazel/blob/master/src/main/protobuf/remote_protocol.proto)
+We're working on a [gRPC protocol](https://github.com/googleapis/googleapis/blob/master/google/devtools/remoteexecution/v1test/remote_execution.proto)
that supports both remote caching and remote execution. As of this writing, there is only a single server-side implementation, which is not intended for production use.
### Bazel Setup
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
index 44a63c4d40..84fdb29430 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
@@ -14,15 +14,14 @@
package com.google.devtools.build.lib.remote;
-import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Digest;
import java.io.IOException;
import java.util.Collection;
import javax.annotation.Nullable;
@@ -32,8 +31,8 @@ import javax.annotation.Nullable;
interface RemoteActionCache {
// CAS API
- // TODO(olaola): create a unified set of exceptions raised by the cache to encapsulate the
- // underlying CasStatus messages and gRPC errors errors.
+ // TODO(buchgr): consider removing the CacheNotFoundException, and replacing it with other
+ // ways to signal a cache miss.
/**
* Upload enough of the tree metadata and data into remote cache so that the entire tree can be
@@ -45,7 +44,7 @@ interface RemoteActionCache {
/**
* Download the entire tree data rooted by the given digest and write it into the given location.
*/
- void downloadTree(ContentDigest rootDigest, Path rootLocation)
+ void downloadTree(Digest rootDigest, Path rootLocation)
throws IOException, CacheNotFoundException;
/**
@@ -68,7 +67,7 @@ interface RemoteActionCache {
*
* @return The key for fetching the file contents blob from cache.
*/
- ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException;
+ Digest uploadFileContents(Path file) throws IOException, InterruptedException;
/**
* Put the input file contents in cache if it is not already in it. No-op if the data is already
@@ -76,22 +75,14 @@ interface RemoteActionCache {
*
* @return The key for fetching the file contents blob from cache.
*/
- ContentDigest uploadFileContents(
- ActionInput input, Path execRoot, ActionInputFileCache inputCache)
+ Digest uploadFileContents(ActionInput input, Path execRoot, ActionInputFileCache inputCache)
throws IOException, InterruptedException;
- /** Upload the given blobs to the cache, and return their digests. */
- ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs) throws InterruptedException;
-
/** Upload the given blob to the cache, and return its digests. */
- ContentDigest uploadBlob(byte[] blob) throws InterruptedException;
+ Digest uploadBlob(byte[] blob) throws InterruptedException;
/** Download and return a blob with a given digest from the cache. */
- byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException;
-
- /** Download and return blobs with given digests from the cache. */
- ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests)
- throws CacheNotFoundException;
+ byte[] downloadBlob(Digest digest) throws CacheNotFoundException;
// Execution Cache API
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
index fcdb44a14a..d779aa7073 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
@@ -72,30 +72,6 @@ public final class RemoteOptions extends OptionsBase {
public String remoteCache;
@Option(
- name = "grpc_max_chunk_size_bytes",
- defaultValue = "16000",
- category = "remote",
- help = "The maximal number of data bytes to be sent in a single message."
- )
- public int grpcMaxChunkSizeBytes;
-
- @Option(
- name = "grpc_max_batch_inputs",
- defaultValue = "100",
- category = "remote",
- help = "The maximal number of input files to be sent in a single batch."
- )
- public int grpcMaxBatchInputs;
-
- @Option(
- name = "grpc_max_batch_size_bytes",
- defaultValue = "10485760", // 10MB
- category = "remote",
- help = "The maximal number of input bytes to be sent in a single batch."
- )
- public int grpcMaxBatchSizeBytes;
-
- @Option(
name = "remote_timeout",
defaultValue = "60",
category = "remote",
@@ -134,4 +110,12 @@ public final class RemoteOptions extends OptionsBase {
help = "Temporary, for testing only. Manually set a Platform to pass to remote execution."
)
public String experimentalRemotePlatformOverride;
+
+ @Option(
+ name = "remote_instance_name",
+ defaultValue = "",
+ category = "remote",
+ help = "Value to pass as instance_name in the remote execution API."
+ )
+ public String remoteInstanceName;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index bbb0166e4e..8807d0652b 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -14,32 +14,28 @@
package com.google.devtools.build.lib.remote;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.Spawn;
-import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
+import com.google.devtools.build.lib.actions.Spawns;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.exec.SpawnResult;
import com.google.devtools.build.lib.exec.SpawnResult.Status;
import com.google.devtools.build.lib.exec.SpawnRunner;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Command;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Platform;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.remoteexecution.v1test.Action;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Command;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
+import com.google.devtools.remoteexecution.v1test.Platform;
+import com.google.protobuf.Duration;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
import io.grpc.StatusRuntimeException;
@@ -49,9 +45,7 @@ import java.util.List;
import java.util.SortedMap;
import java.util.TreeSet;
-/**
- * A client for the remote execution service.
- */
+/** A client for the remote execution service. */
@ThreadSafe
final class RemoteSpawnRunner implements SpawnRunner {
private final Path execRoot;
@@ -60,11 +54,13 @@ final class RemoteSpawnRunner implements SpawnRunner {
private final Platform platform;
private final GrpcRemoteExecutor executor;
+ private final GrpcActionCache remoteCache;
RemoteSpawnRunner(
Path execRoot,
RemoteOptions options,
- GrpcRemoteExecutor executor) {
+ GrpcRemoteExecutor executor,
+ GrpcActionCache remoteCache) {
this.execRoot = execRoot;
this.options = options;
if (options.experimentalRemotePlatformOverride != null) {
@@ -79,27 +75,12 @@ final class RemoteSpawnRunner implements SpawnRunner {
platform = null;
}
this.executor = executor;
- }
-
- RemoteSpawnRunner(
- Path execRoot,
- RemoteOptions options,
- AuthAndTLSOptions authTlsOptions) {
- this(execRoot, options, connect(options, authTlsOptions));
- }
-
- private static GrpcRemoteExecutor connect(RemoteOptions options,
- AuthAndTLSOptions authTlsOptions) {
- Preconditions.checkArgument(GrpcRemoteExecutor.isRemoteExecutionOptions(options));
- ChannelOptions channelOptions = ChannelOptions.create(authTlsOptions,
- options.grpcMaxChunkSizeBytes);
- return new GrpcRemoteExecutor(
- RemoteUtils.createChannel(options.remoteExecutor, channelOptions), channelOptions, options);
+ this.remoteCache = remoteCache;
}
@Override
public SpawnResult exec(Spawn spawn, SpawnExecutionPolicy policy)
- throws InterruptedException, IOException {
+ throws ExecException, InterruptedException, IOException {
ActionExecutionMetadata owner = spawn.getResourceOwner();
if (owner.getOwner() != null) {
policy.report(ProgressStatus.EXECUTING);
@@ -116,46 +97,38 @@ final class RemoteSpawnRunner implements SpawnRunner {
Action action =
buildAction(
spawn.getOutputFiles(),
- ContentDigests.computeDigest(command),
- repository.getMerkleDigest(inputRoot));
+ Digests.computeDigest(command),
+ repository.getMerkleDigest(inputRoot),
+ // TODO(olaola): set sensible local and remote timouts.
+ Spawns.getTimeoutSeconds(spawn, 120));
- ActionKey actionKey = ContentDigests.computeActionKey(action);
+ ActionKey actionKey = Digests.computeActionKey(action);
ActionResult result =
- this.options.remoteAcceptCached ? executor.getCachedActionResult(actionKey) : null;
+ options.remoteAcceptCached ? remoteCache.getCachedActionResult(actionKey) : null;
if (result == null) {
// Cache miss or we don't accept cache hits.
// Upload the command and all the inputs into the remote cache.
- executor.uploadBlob(command.toByteArray());
- // TODO(olaola): this should use the ActionInputFileCache for SHA1 digests!
- executor.uploadTree(repository, execRoot, inputRoot);
+ remoteCache.uploadBlob(command.toByteArray());
+ remoteCache.uploadTree(repository, execRoot, inputRoot);
// TODO(olaola): set BuildInfo and input total bytes as well.
ExecuteRequest.Builder request =
ExecuteRequest.newBuilder()
+ .setInstanceName(options.remoteInstanceName)
.setAction(action)
- .setAcceptCached(this.options.remoteAcceptCached)
+ .setWaitForCompletion(true)
.setTotalInputFileCount(inputMap.size())
- .setTimeoutMillis(policy.getTimeoutMillis());
- ExecuteReply reply = executor.executeRemotely(request.build());
- ExecutionStatus status = reply.getStatus();
-
- if (!status.getSucceeded()
- && (status.getError() != ExecutionStatus.ErrorCode.EXEC_FAILED)) {
- return new SpawnResult.Builder()
- // TODO(ulfjack): Improve the translation of the error status.
- .setStatus(Status.EXECUTION_FAILED)
- .setExitCode(-1)
- .build();
- }
-
- result = reply.getResult();
+ .setSkipCacheLookup(!options.remoteAcceptCached);
+ result = executor.executeRemotely(request.build()).getResult();
}
// TODO(ulfjack): Download stdout, stderr, and the output files in a single call.
- passRemoteOutErr(executor, result, policy.getFileOutErr());
- executor.downloadAllResults(result, execRoot);
+ passRemoteOutErr(remoteCache, result, policy.getFileOutErr());
+ if (result.getExitCode() == 0) {
+ remoteCache.downloadAllResults(result, execRoot);
+ }
return new SpawnResult.Builder()
- .setStatus(Status.SUCCESS)
- .setExitCode(result.getReturnCode())
+ .setStatus(Status.SUCCESS) // Even if the action failed with non-zero exit code.
+ .setExitCode(result.getExitCode())
.build();
} catch (StatusRuntimeException | CacheNotFoundException e) {
throw new IOException(e);
@@ -163,37 +136,58 @@ final class RemoteSpawnRunner implements SpawnRunner {
}
private Action buildAction(
- Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) {
+ Collection<? extends ActionInput> outputs,
+ Digest command,
+ Digest inputRoot,
+ long timeoutSeconds) {
Action.Builder action = Action.newBuilder();
action.setCommandDigest(command);
action.setInputRootDigest(inputRoot);
// Somewhat ugly: we rely on the stable order of outputs here for remote action caching.
for (ActionInput output : outputs) {
- action.addOutputPath(output.getExecPathString());
+ // TODO: output directories should be handled here, when they are supported.
+ action.addOutputFiles(output.getExecPathString());
}
if (platform != null) {
action.setPlatform(platform);
}
+ action.setTimeout(Duration.newBuilder().setSeconds(timeoutSeconds));
return action.build();
}
private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) {
Command.Builder command = Command.newBuilder();
- command.addAllArgv(arguments);
+ command.addAllArguments(arguments);
// Sorting the environment pairs by variable name.
TreeSet<String> variables = new TreeSet<>(environment.keySet());
for (String var : variables) {
- command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var));
+ command.addEnvironmentVariablesBuilder().setName(var).setValue(environment.get(var));
}
return command.build();
}
private static void passRemoteOutErr(
- RemoteActionCache cache, ActionResult result, FileOutErr outErr)
- throws CacheNotFoundException {
- ImmutableList<byte[]> streams =
- cache.downloadBlobs(ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest()));
- outErr.printOut(new String(streams.get(0), UTF_8));
- outErr.printErr(new String(streams.get(1), UTF_8));
+ RemoteActionCache cache, ActionResult result, FileOutErr outErr) throws IOException {
+ try {
+ if (!result.getStdoutRaw().isEmpty()) {
+ result.getStdoutRaw().writeTo(outErr.getOutputStream());
+ outErr.getOutputStream().flush();
+ } else if (result.hasStdoutDigest()) {
+ byte[] stdoutBytes = cache.downloadBlob(result.getStdoutDigest());
+ outErr.getOutputStream().write(stdoutBytes);
+ outErr.getOutputStream().flush();
+ }
+ if (!result.getStderrRaw().isEmpty()) {
+ result.getStderrRaw().writeTo(outErr.getErrorStream());
+ outErr.getErrorStream().flush();
+ } else if (result.hasStderrDigest()) {
+ byte[] stderrBytes = cache.downloadBlob(result.getStderrDigest());
+ outErr.getErrorStream().write(stderrBytes);
+ outErr.getErrorStream().flush();
+ }
+ } catch (CacheNotFoundException e) {
+ outErr.printOutLn("Failed to fetch remote stdout/err due to cache miss.");
+ outErr.getOutputStream().flush();
+ }
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
index f3e12f07ee..3f7698252e 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
@@ -15,7 +15,6 @@
package com.google.devtools.build.lib.remote;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.devtools.build.lib.actions.ActionExecutionContext;
import com.google.devtools.build.lib.actions.ActionInput;
@@ -32,15 +31,7 @@ import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
import com.google.devtools.build.lib.exec.SpawnInputExpander;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Command;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Platform;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.rules.fileset.FilesetActionContext;
import com.google.devtools.build.lib.standalone.StandaloneSpawnStrategy;
@@ -48,6 +39,14 @@ import com.google.devtools.build.lib.util.CommandFailureUtils;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.remoteexecution.v1test.Action;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Command;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
+import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
+import com.google.devtools.remoteexecution.v1test.Platform;
+import com.google.protobuf.Duration;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
import io.grpc.StatusRuntimeException;
@@ -88,12 +87,12 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
this.standaloneStrategy = new StandaloneSpawnStrategy(execRoot, verboseFailures, productName);
this.verboseFailures = verboseFailures;
this.remoteOptions = remoteOptions;
- channelOptions = ChannelOptions.create(authTlsOptions, remoteOptions.grpcMaxChunkSizeBytes);
+ channelOptions = ChannelOptions.create(authTlsOptions);
if (remoteOptions.experimentalRemotePlatformOverride != null) {
Platform.Builder platformBuilder = Platform.newBuilder();
try {
- TextFormat.getParser().merge(remoteOptions.experimentalRemotePlatformOverride,
- platformBuilder);
+ TextFormat.getParser()
+ .merge(remoteOptions.experimentalRemotePlatformOverride, platformBuilder);
} catch (ParseException e) {
throw new IllegalArgumentException(
"Failed to parse --experimental_remote_platform_override", e);
@@ -105,27 +104,32 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
}
private Action buildAction(
- Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) {
+ Collection<? extends ActionInput> outputs,
+ Digest command,
+ Digest inputRoot,
+ long timeoutSeconds) {
Action.Builder action = Action.newBuilder();
action.setCommandDigest(command);
action.setInputRootDigest(inputRoot);
// Somewhat ugly: we rely on the stable order of outputs here for remote action caching.
for (ActionInput output : outputs) {
- action.addOutputPath(output.getExecPathString());
+ // TODO: output directories should be handled here, when they are supported.
+ action.addOutputFiles(output.getExecPathString());
}
if (platform != null) {
action.setPlatform(platform);
}
+ action.setTimeout(Duration.newBuilder().setSeconds(timeoutSeconds));
return action.build();
}
private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) {
Command.Builder command = Command.newBuilder();
- command.addAllArgv(arguments);
+ command.addAllArguments(arguments);
// Sorting the environment pairs by variable name.
TreeSet<String> variables = new TreeSet<>(environment.keySet());
for (String var : variables) {
- command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var));
+ command.addEnvironmentVariablesBuilder().setName(var).setValue(environment.get(var));
}
return command.build();
}
@@ -137,11 +141,11 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
private void execLocally(
Spawn spawn,
ActionExecutionContext actionExecutionContext,
- RemoteActionCache actionCache,
+ RemoteActionCache remoteCache,
ActionKey actionKey)
throws ExecException, InterruptedException {
standaloneStrategy.exec(spawn, actionExecutionContext);
- if (remoteOptions.remoteUploadLocalResults && actionCache != null && actionKey != null) {
+ if (remoteOptions.remoteUploadLocalResults && remoteCache != null && actionKey != null) {
ArrayList<Path> outputFiles = new ArrayList<>();
for (ActionInput output : spawn.getOutputFiles()) {
Path outputFile = execRoot.getRelative(output.getExecPathString());
@@ -155,17 +159,17 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
}
try {
ActionResult.Builder result = ActionResult.newBuilder();
- actionCache.uploadAllResults(execRoot, outputFiles, result);
+ remoteCache.uploadAllResults(execRoot, outputFiles, result);
FileOutErr outErr = actionExecutionContext.getFileOutErr();
if (outErr.getErrorPath().exists()) {
- ContentDigest stderr = actionCache.uploadFileContents(outErr.getErrorPath());
+ Digest stderr = remoteCache.uploadFileContents(outErr.getErrorPath());
result.setStderrDigest(stderr);
}
if (outErr.getOutputPath().exists()) {
- ContentDigest stdout = actionCache.uploadFileContents(outErr.getOutputPath());
+ Digest stdout = remoteCache.uploadFileContents(outErr.getOutputPath());
result.setStdoutDigest(stdout);
}
- actionCache.setCachedActionResult(actionKey, result.build());
+ remoteCache.setCachedActionResult(actionKey, result.build());
// Handle all cache errors here.
} catch (IOException e) {
throw new UserExecException("Unexpected IO error.", e);
@@ -188,21 +192,25 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
private static void passRemoteOutErr(
RemoteActionCache cache, ActionResult result, FileOutErr outErr) throws IOException {
try {
- ImmutableList<byte[]> streams =
- cache.downloadBlobs(ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest()));
- byte[] stdout = streams.get(0);
- byte[] stderr = streams.get(1);
- // Don't round-trip through String - write to the output streams directly.
- if (stdout.length != 0) {
- outErr.getOutputStream().write(stdout);
+ if (!result.getStdoutRaw().isEmpty()) {
+ result.getStdoutRaw().writeTo(outErr.getOutputStream());
+ outErr.getOutputStream().flush();
+ } else if (result.hasStdoutDigest()) {
+ byte[] stdoutBytes = cache.downloadBlob(result.getStdoutDigest());
+ outErr.getOutputStream().write(stdoutBytes);
outErr.getOutputStream().flush();
}
- if (stderr.length != 0) {
- outErr.getErrorStream().write(stderr);
+ if (!result.getStderrRaw().isEmpty()) {
+ result.getStderrRaw().writeTo(outErr.getErrorStream());
+ outErr.getErrorStream().flush();
+ } else if (result.hasStderrDigest()) {
+ byte[] stderrBytes = cache.downloadBlob(result.getStderrDigest());
+ outErr.getErrorStream().write(stderrBytes);
outErr.getErrorStream().flush();
}
} catch (CacheNotFoundException e) {
- // Ignoring.
+ outErr.printOutLn("Failed to fetch remote stdout/err due to cache miss.");
+ outErr.getOutputStream().flush();
}
}
@@ -220,19 +228,23 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
Executor executor = actionExecutionContext.getExecutor();
EventHandler eventHandler = executor.getEventHandler();
- RemoteActionCache actionCache = null;
+ RemoteActionCache remoteCache = null;
GrpcRemoteExecutor workExecutor = null;
if (spawn.isRemotable()) {
// Initialize remote cache and execution handlers. We use separate handlers for every
// action to enable server-side parallelism (need a different gRPC channel per action).
if (SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions)) {
- actionCache = new SimpleBlobStoreActionCache(SimpleBlobStoreFactory.create(remoteOptions));
+ remoteCache = new SimpleBlobStoreActionCache(SimpleBlobStoreFactory.create(remoteOptions));
} else if (GrpcActionCache.isRemoteCacheOptions(remoteOptions)) {
- actionCache = new GrpcActionCache(remoteOptions, channelOptions);
+ remoteCache =
+ new GrpcActionCache(
+ RemoteUtils.createChannel(remoteOptions.remoteCache, channelOptions),
+ channelOptions,
+ remoteOptions);
}
- // Otherwise actionCache remains null and remote caching/execution are disabled.
+ // Otherwise remoteCache remains null and remote caching/execution are disabled.
- if (actionCache != null && GrpcRemoteExecutor.isRemoteExecutionOptions(remoteOptions)) {
+ if (remoteCache != null && GrpcRemoteExecutor.isRemoteExecutionOptions(remoteOptions)) {
workExecutor =
new GrpcRemoteExecutor(
RemoteUtils.createChannel(remoteOptions.remoteExecutor, channelOptions),
@@ -240,15 +252,16 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
remoteOptions);
}
}
- if (!spawn.isRemotable() || actionCache == null) {
+ if (!spawn.isRemotable() || remoteCache == null) {
standaloneStrategy.exec(spawn, actionExecutionContext);
return;
}
if (executor.reportsSubcommands()) {
executor.reportSubcommand(spawn);
}
- executor.getEventBus().post(
- ActionStatusMessage.runningStrategy(spawn.getResourceOwner(), "remote"));
+ executor
+ .getEventBus()
+ .post(ActionStatusMessage.runningStrategy(spawn.getResourceOwner(), "remote"));
try {
// Temporary hack: the TreeNodeRepository should be created and maintained upstream!
@@ -266,22 +279,25 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
Action action =
buildAction(
spawn.getOutputFiles(),
- ContentDigests.computeDigest(command),
- repository.getMerkleDigest(inputRoot));
+ Digests.computeDigest(command),
+ repository.getMerkleDigest(inputRoot),
+ // TODO(olaola): set sensible local and remote timouts.
+ Spawns.getTimeoutSeconds(spawn, 120));
// Look up action cache, and reuse the action output if it is found.
- actionKey = ContentDigests.computeActionKey(action);
- ActionResult result = this.remoteOptions.remoteAcceptCached
- ? actionCache.getCachedActionResult(actionKey)
- : null;
+ actionKey = Digests.computeActionKey(action);
+ ActionResult result =
+ this.remoteOptions.remoteAcceptCached
+ ? remoteCache.getCachedActionResult(actionKey)
+ : null;
boolean acceptCachedResult = this.remoteOptions.remoteAcceptCached;
if (result != null) {
// We don't cache failed actions, so we know the outputs exist.
// For now, download all outputs locally; in the future, we can reuse the digests to
// just update the TreeNodeRepository and continue the build.
try {
- actionCache.downloadAllResults(result, execRoot);
- passRemoteOutErr(actionCache, result, actionExecutionContext.getFileOutErr());
+ remoteCache.downloadAllResults(result, execRoot);
+ passRemoteOutErr(remoteCache, result, actionExecutionContext.getFileOutErr());
return;
} catch (CacheNotFoundException e) {
acceptCachedResult = false; // Retry the action remotely and invalidate the results.
@@ -289,47 +305,39 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
}
if (workExecutor == null) {
- execLocally(spawn, actionExecutionContext, actionCache, actionKey);
+ execLocally(spawn, actionExecutionContext, remoteCache, actionKey);
return;
}
// Upload the command and all the inputs into the remote cache.
- actionCache.uploadBlob(command.toByteArray());
- // TODO(olaola): this should use the ActionInputFileCache for SHA1 digests!
- actionCache.uploadTree(repository, execRoot, inputRoot);
+ remoteCache.uploadBlob(command.toByteArray());
+ remoteCache.uploadTree(repository, execRoot, inputRoot);
// TODO(olaola): set BuildInfo and input total bytes as well.
ExecuteRequest.Builder request =
ExecuteRequest.newBuilder()
+ .setInstanceName(remoteOptions.remoteInstanceName)
.setAction(action)
- .setAcceptCached(acceptCachedResult)
+ .setWaitForCompletion(true)
.setTotalInputFileCount(inputMap.size())
- .setTimeoutMillis(1000 * Spawns.getTimeoutSeconds(spawn, 120));
- // TODO(olaola): set sensible local and remote timouts.
- ExecuteReply reply = workExecutor.executeRemotely(request.build());
- ExecutionStatus status = reply.getStatus();
+ .setSkipCacheLookup(!acceptCachedResult);
+ ExecuteResponse reply = workExecutor.executeRemotely(request.build());
result = reply.getResult();
- // We do not want to pass on the remote stdout and strerr if we are going to retry the
- // action.
- if (status.getSucceeded()) {
- passRemoteOutErr(actionCache, result, actionExecutionContext.getFileOutErr());
- actionCache.downloadAllResults(result, execRoot);
- if (result.getReturnCode() != 0) {
- String cwd = executor.getExecRoot().getPathString();
- String message =
- CommandFailureUtils.describeCommandFailure(
- verboseFailures, spawn.getArguments(), spawn.getEnvironment(), cwd);
- throw new UserExecException(message + ": Exit " + result.getReturnCode());
- }
+ if (remoteOptions.remoteLocalFallback && result.getExitCode() != 0) {
+ execLocally(spawn, actionExecutionContext, remoteCache, actionKey);
return;
}
- if (status.getError() == ExecutionStatus.ErrorCode.EXEC_FAILED
- || !remoteOptions.remoteLocalFallback) {
- passRemoteOutErr(actionCache, result, actionExecutionContext.getFileOutErr());
- throw new UserExecException(status.getErrorDetail());
+ passRemoteOutErr(remoteCache, result, actionExecutionContext.getFileOutErr());
+ if (result.getExitCode() == 0) {
+ remoteCache.downloadAllResults(result, execRoot);
+ } else {
+ String cwd = executor.getExecRoot().getPathString();
+ String message =
+ CommandFailureUtils.describeCommandFailure(
+ verboseFailures, spawn.getArguments(), spawn.getEnvironment(), cwd);
+ throw new UserExecException(message + ": Exit " + result.getExitCode());
}
// For now, we retry locally on all other remote errors.
// TODO(olaola): add remote retries on cache miss errors.
- execLocally(spawn, actionExecutionContext, actionCache, actionKey);
} catch (IOException e) {
throw new UserExecException("Unexpected IO error.", e);
} catch (InterruptedException e) {
@@ -343,14 +351,15 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
}
eventHandler.handle(Event.warn(mnemonic + " remote work failed (" + e + ")" + stackTrace));
if (remoteOptions.remoteLocalFallback) {
- execLocally(spawn, actionExecutionContext, actionCache, actionKey);
+ execLocally(spawn, actionExecutionContext, remoteCache, actionKey);
} else {
throw new UserExecException(e);
}
} catch (CacheNotFoundException e) {
+ // TODO(olaola): handle this exception by reuploading / reexecuting the action remotely.
eventHandler.handle(Event.warn(mnemonic + " remote work results cache miss (" + e + ")"));
if (remoteOptions.remoteLocalFallback) {
- execLocally(spawn, actionExecutionContext, actionCache, actionKey);
+ execLocally(spawn, actionExecutionContext, remoteCache, actionKey);
} else {
throw new UserExecException(e);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
index 845f5411fe..58b2c94bf8 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
@@ -26,8 +26,7 @@ public final class RemoteUtils {
NettyChannelBuilder builder =
NettyChannelBuilder.forTarget(target)
.negotiationType(
- channelOptions.tlsEnabled() ? NegotiationType.TLS : NegotiationType.PLAINTEXT)
- .maxMessageSize(channelOptions.maxMessageSize());
+ channelOptions.tlsEnabled() ? NegotiationType.TLS : NegotiationType.PLAINTEXT);
if (channelOptions.getSslContext() != null) {
builder.sslContext(channelOptions.getSslContext());
if (channelOptions.getTlsAuthorityOverride() != null) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
index bb1ca512d5..75d208a687 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
@@ -14,28 +14,27 @@
package com.google.devtools.build.lib.remote;
-import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileMetadata;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Output;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Output.ContentCase;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.Directory;
+import com.google.devtools.remoteexecution.v1test.DirectoryNode;
+import com.google.devtools.remoteexecution.v1test.FileNode;
+import com.google.devtools.remoteexecution.v1test.OutputDirectory;
+import com.google.devtools.remoteexecution.v1test.OutputFile;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Semaphore;
@@ -59,8 +58,8 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
public void uploadTree(TreeNodeRepository repository, Path execRoot, TreeNode root)
throws IOException, InterruptedException {
repository.computeMerkleDigests(root);
- for (FileNode fileNode : repository.treeToFileNodes(root)) {
- uploadBlob(fileNode.toByteArray());
+ for (Directory directory : repository.treeToDirectories(root)) {
+ uploadBlob(directory.toByteArray());
}
// TODO(ulfjack): Only upload files that aren't in the CAS yet?
for (TreeNode leaf : repository.leaves(root)) {
@@ -69,26 +68,26 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
}
@Override
- public void downloadTree(ContentDigest rootDigest, Path rootLocation)
+ public void downloadTree(Digest rootDigest, Path rootLocation)
throws IOException, CacheNotFoundException {
- FileNode fileNode = FileNode.parseFrom(downloadBlob(rootDigest));
- if (fileNode.hasFileMetadata()) {
- FileMetadata meta = fileNode.getFileMetadata();
- downloadFileContents(meta.getDigest(), rootLocation, meta.getExecutable());
+ Directory directory = Directory.parseFrom(downloadBlob(rootDigest));
+ for (FileNode file : directory.getFilesList()) {
+ downloadFileContents(
+ file.getDigest(), rootLocation.getRelative(file.getName()), file.getIsExecutable());
}
- for (FileNode.Child child : fileNode.getChildList()) {
- downloadTree(child.getDigest(), rootLocation.getRelative(child.getPath()));
+ for (DirectoryNode child : directory.getDirectoriesList()) {
+ downloadTree(child.getDigest(), rootLocation.getRelative(child.getName()));
}
}
@Override
- public ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException {
+ public Digest uploadFileContents(Path file) throws IOException, InterruptedException {
// This unconditionally reads the whole file into memory first!
return uploadBlob(ByteString.readFrom(file.getInputStream()).toByteArray());
}
@Override
- public ContentDigest uploadFileContents(
+ public Digest uploadFileContents(
ActionInput input, Path execRoot, ActionInputFileCache inputCache)
throws IOException, InterruptedException {
// This unconditionally reads the whole file into memory first!
@@ -96,26 +95,31 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
((VirtualActionInput) input).writeTo(buffer);
byte[] blob = buffer.toByteArray();
- return uploadBlob(blob, ContentDigests.computeDigest(blob));
+ return uploadBlob(blob, Digests.computeDigest(blob));
}
return uploadBlob(
ByteString.readFrom(execRoot.getRelative(input.getExecPathString()).getInputStream())
.toByteArray(),
- ContentDigests.getDigestFromInputCache(input, inputCache));
+ Digests.getDigestFromInputCache(input, inputCache));
}
@Override
public void downloadAllResults(ActionResult result, Path execRoot)
throws IOException, CacheNotFoundException {
- for (Output output : result.getOutputList()) {
- if (output.getContentCase() == ContentCase.FILE_METADATA) {
- FileMetadata m = output.getFileMetadata();
- downloadFileContents(
- m.getDigest(), execRoot.getRelative(output.getPath()), m.getExecutable());
+ for (OutputFile file : result.getOutputFilesList()) {
+ if (!file.getContent().isEmpty()) {
+ createFile(
+ file.getContent().toByteArray(),
+ execRoot.getRelative(file.getPath()),
+ file.getIsExecutable());
} else {
- downloadTree(output.getDigest(), execRoot.getRelative(output.getPath()));
+ downloadFileContents(
+ file.getDigest(), execRoot.getRelative(file.getPath()), file.getIsExecutable());
}
}
+ for (OutputDirectory directory : result.getOutputDirectoriesList()) {
+ downloadTree(directory.getDigest(), execRoot.getRelative(directory.getPath()));
+ }
}
@Override
@@ -130,22 +134,25 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
// TreeNodeRepository to call uploadTree.
throw new UnsupportedOperationException("Storing a directory is not yet supported.");
}
+ // TODO(olaola): inline small file contents here.
// First put the file content to cache.
- ContentDigest digest = uploadFileContents(file);
+ Digest digest = uploadFileContents(file);
// Add to protobuf.
result
- .addOutputBuilder()
+ .addOutputFilesBuilder()
.setPath(file.relativeTo(execRoot).getPathString())
- .getFileMetadataBuilder()
.setDigest(digest)
- .setExecutable(file.isExecutable());
+ .setIsExecutable(file.isExecutable());
}
}
- private void downloadFileContents(ContentDigest digest, Path dest, boolean executable)
+ private void downloadFileContents(Digest digest, Path dest, boolean executable)
throws IOException, CacheNotFoundException {
// This unconditionally downloads the whole file into memory first!
- byte[] contents = downloadBlob(digest);
+ createFile(downloadBlob(digest), dest, executable);
+ }
+
+ private void createFile(byte[] contents, Path dest, boolean executable) throws IOException {
FileSystemUtils.createDirectoryAndParents(dest.getParentDirectory());
try (OutputStream stream = dest.getOutputStream()) {
stream.write(contents);
@@ -153,16 +160,6 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
dest.setExecutable(executable);
}
- @Override
- public ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs)
- throws InterruptedException {
- ArrayList<ContentDigest> digests = new ArrayList<>();
- for (byte[] blob : blobs) {
- digests.add(uploadBlob(blob));
- }
- return ImmutableList.copyOf(digests);
- }
-
private void checkBlobSize(long blobSizeKBytes, String type) {
Preconditions.checkArgument(
blobSizeKBytes < MAX_MEMORY_KBYTES,
@@ -172,16 +169,16 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
}
@Override
- public ContentDigest uploadBlob(byte[] blob) throws InterruptedException {
- return uploadBlob(blob, ContentDigests.computeDigest(blob));
+ public Digest uploadBlob(byte[] blob) throws InterruptedException {
+ return uploadBlob(blob, Digests.computeDigest(blob));
}
- private ContentDigest uploadBlob(byte[] blob, ContentDigest digest) throws InterruptedException {
+ private Digest uploadBlob(byte[] blob, Digest digest) throws InterruptedException {
int blobSizeKBytes = blob.length / 1024;
checkBlobSize(blobSizeKBytes, "Upload");
uploadMemoryAvailable.acquire(blobSizeKBytes);
try {
- blobStore.put(ContentDigests.toHexString(digest), blob);
+ blobStore.put(digest.getHash(), blob);
} finally {
uploadMemoryAvailable.release(blobSizeKBytes);
}
@@ -189,36 +186,26 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
}
@Override
- public byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException {
+ public byte[] downloadBlob(Digest digest) throws CacheNotFoundException {
if (digest.getSizeBytes() == 0) {
return new byte[0];
}
// This unconditionally downloads the whole blob into memory!
checkBlobSize(digest.getSizeBytes() / 1024, "Download");
- byte[] data = blobStore.get(ContentDigests.toHexString(digest));
+ byte[] data = blobStore.get(digest.getHash());
if (data == null) {
throw new CacheNotFoundException(digest);
}
return data;
}
- @Override
- public ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests)
- throws CacheNotFoundException {
- ArrayList<byte[]> blobs = new ArrayList<>();
- for (ContentDigest c : digests) {
- blobs.add(downloadBlob(c));
- }
- return ImmutableList.copyOf(blobs);
- }
-
- public boolean containsKey(ContentDigest digest) {
- return blobStore.containsKey(ContentDigests.toHexString(digest));
+ public boolean containsKey(Digest digest) {
+ return blobStore.containsKey(digest.getHash());
}
@Override
public ActionResult getCachedActionResult(ActionKey actionKey) {
- byte[] data = blobStore.get(ContentDigests.toHexString(actionKey.getDigest()));
+ byte[] data = blobStore.get(actionKey.getDigest().getHash());
if (data == null) {
return null;
}
@@ -232,6 +219,6 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
@Override
public void setCachedActionResult(ActionKey actionKey, ActionResult result)
throws InterruptedException {
- blobStore.put(ContentDigests.toHexString(actionKey.getDigest()), result.toByteArray());
+ blobStore.put(actionKey.getDigest().getHash(), result.toByteArray());
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java b/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java
index 8272295a84..dba3a14c12 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java
@@ -29,11 +29,11 @@ import com.google.devtools.build.lib.concurrent.BlazeInterners;
import com.google.devtools.build.lib.concurrent.ThreadSafety.Immutable;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.exec.SpawnInputExpander;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.Directory;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
@@ -185,11 +185,11 @@ public final class TreeNodeRepository extends TreeTraverser<TreeNodeRepository.T
// be part of the state.
private final Path execRoot;
private final ActionInputFileCache inputFileCache;
- private final Map<TreeNode, ContentDigest> treeNodeDigestCache = new HashMap<>();
- private final Map<ContentDigest, TreeNode> digestTreeNodeCache = new HashMap<>();
- private final Map<TreeNode, FileNode> fileNodeCache = new HashMap<>();
- private final Map<VirtualActionInput, ContentDigest> virtualInputDigestCache = new HashMap<>();
- private final Map<ContentDigest, VirtualActionInput> digestVirtualInputCache = new HashMap<>();
+ private final Map<TreeNode, Digest> treeNodeDigestCache = new HashMap<>();
+ private final Map<Digest, TreeNode> digestTreeNodeCache = new HashMap<>();
+ private final Map<TreeNode, Directory> directoryCache = new HashMap<>();
+ private final Map<VirtualActionInput, Digest> virtualInputDigestCache = new HashMap<>();
+ private final Map<Digest, VirtualActionInput> digestVirtualInputCache = new HashMap<>();
public TreeNodeRepository(Path execRoot, ActionInputFileCache inputFileCache) {
this.execRoot = execRoot;
@@ -298,125 +298,125 @@ public final class TreeNodeRepository extends TreeTraverser<TreeNodeRepository.T
return interner.intern(new TreeNode(entries));
}
- private synchronized FileNode getOrComputeFileNode(TreeNode node) throws IOException {
+ private synchronized Directory getOrComputeDirectory(TreeNode node) throws IOException {
// Assumes all child digests have already been computed!
- FileNode fileNode = fileNodeCache.get(node);
- if (fileNode == null) {
- FileNode.Builder b = FileNode.newBuilder();
- if (node.isLeaf()) {
- ActionInput input = node.getActionInput();
- if (input instanceof VirtualActionInput) {
- VirtualActionInput virtualInput = (VirtualActionInput) input;
- ContentDigest digest = ContentDigests.computeDigest(virtualInput);
- virtualInputDigestCache.put(virtualInput, digest);
- // There may be multiple inputs with the same digest. In that case, we don't care which
- // one we get back from the digestVirtualInputCache later.
- digestVirtualInputCache.put(digest, virtualInput);
- b.getFileMetadataBuilder()
- .setDigest(digest)
- // We always declare virtual action inputs as non-executable for now.
- .setExecutable(false);
+ Preconditions.checkArgument(!node.isLeaf());
+ Directory directory = directoryCache.get(node);
+ if (directory == null) {
+ Directory.Builder b = Directory.newBuilder();
+ for (TreeNode.ChildEntry entry : node.getChildEntries()) {
+ TreeNode child = entry.getChild();
+ if (child.isLeaf()) {
+ ActionInput input = child.getActionInput();
+ if (input instanceof VirtualActionInput) {
+ VirtualActionInput virtualInput = (VirtualActionInput) input;
+ Digest digest = Digests.computeDigest(virtualInput);
+ virtualInputDigestCache.put(virtualInput, digest);
+ // There may be multiple inputs with the same digest. In that case, we don't care which
+ // one we get back from the digestVirtualInputCache later.
+ digestVirtualInputCache.put(digest, virtualInput);
+ b.addFilesBuilder()
+ .setName(entry.getSegment())
+ .setDigest(digest)
+ .setIsExecutable(false);
+ } else {
+ b.addFilesBuilder()
+ .setName(entry.getSegment())
+ .setDigest(Digests.getDigestFromInputCache(input, inputFileCache))
+ .setIsExecutable(execRoot.getRelative(input.getExecPathString()).isExecutable());
+ }
} else {
- b.getFileMetadataBuilder()
- .setDigest(ContentDigests.getDigestFromInputCache(input, inputFileCache))
- .setExecutable(execRoot.getRelative(input.getExecPathString()).isExecutable());
- }
- } else {
- for (TreeNode.ChildEntry entry : node.getChildEntries()) {
- ContentDigest childDigest = treeNodeDigestCache.get(entry.getChild());
- Preconditions.checkState(childDigest != null);
- b.addChildBuilder().setPath(entry.getSegment()).setDigest(childDigest);
+ Digest childDigest = Preconditions.checkNotNull(treeNodeDigestCache.get(child));
+ b.addDirectoriesBuilder().setName(entry.getSegment()).setDigest(childDigest);
}
}
- fileNode = b.build();
- fileNodeCache.put(node, fileNode);
- ContentDigest digest = ContentDigests.computeDigest(fileNode);
+ directory = b.build();
+ directoryCache.put(node, directory);
+ Digest digest = Digests.computeDigest(directory);
treeNodeDigestCache.put(node, digest);
digestTreeNodeCache.put(digest, node);
}
- return fileNode;
+ return directory;
}
// Recursively traverses the tree, expanding and computing Merkle digests for nodes for which
// they have not yet been computed and cached.
public void computeMerkleDigests(TreeNode root) throws IOException {
synchronized (this) {
- if (fileNodeCache.get(root) != null) {
+ if (directoryCache.get(root) != null) {
// Strong assumption: the cache is valid, i.e. parent present implies children present.
return;
}
}
- if (root.isLeaf()) {
- ActionInput input = root.getActionInput();
- if (!(input instanceof VirtualActionInput)) {
- inputFileCache.getDigest(input);
- }
- } else {
+ if (!root.isLeaf()) {
for (TreeNode child : children(root)) {
computeMerkleDigests(child);
}
+ getOrComputeDirectory(root);
}
- getOrComputeFileNode(root);
}
/**
* Should only be used after computeMerkleDigests has been called on one of the node ancestors.
* Returns the precomputed digest.
*/
- public ContentDigest getMerkleDigest(TreeNode node) {
- return treeNodeDigestCache.get(node);
+ public Digest getMerkleDigest(TreeNode node) throws IOException {
+ return node.isLeaf()
+ ? actionInputToDigest(node.getActionInput())
+ : treeNodeDigestCache.get(node);
}
/**
* Returns the precomputed digests for both data and metadata. Should only be used after
* computeMerkleDigests has been called on one of the node ancestors.
*/
- public ImmutableCollection<ContentDigest> getAllDigests(TreeNode root) throws IOException {
- ImmutableSet.Builder<ContentDigest> digests = ImmutableSet.builder();
+ public ImmutableCollection<Digest> getAllDigests(TreeNode root) throws IOException {
+ ImmutableSet.Builder<Digest> digests = ImmutableSet.builder();
for (TreeNode node : descendants(root)) {
- digests.add(Preconditions.checkNotNull(treeNodeDigestCache.get(node)));
- if (node.isLeaf()) {
- digests.add(actionInputToDigest(node.getActionInput()));
- }
+ digests.add(
+ node.isLeaf()
+ ? actionInputToDigest(node.getActionInput())
+ : Preconditions.checkNotNull(treeNodeDigestCache.get(node)));
}
return digests.build();
}
- private ContentDigest actionInputToDigest(ActionInput input) throws IOException {
+ private Digest actionInputToDigest(ActionInput input) throws IOException {
if (input instanceof VirtualActionInput) {
return Preconditions.checkNotNull(virtualInputDigestCache.get(input));
}
- return ContentDigests.getDigestFromInputCache(input, inputFileCache);
+ return Digests.getDigestFromInputCache(input, inputFileCache);
}
/**
- * Serializes all of the subtree to the file node list. TODO(olaola): add a version that only
- * copies a part of the tree that we are interested in. Should only be used after
- * computeMerkleDigests has been called on one of the node ancestors.
+ * Serializes all of the subtree to a Directory list. TODO(olaola): add a version that only copies
+ * a part of the tree that we are interested in. Should only be used after computeMerkleDigests
+ * has been called on one of the node ancestors.
*/
// Note: this is not, strictly speaking, thread safe. If someone is deleting cached Merkle hashes
// while this is executing, it will trigger an exception. But I think this is WAI.
- public ImmutableList<FileNode> treeToFileNodes(TreeNode root) {
- ImmutableList.Builder<FileNode> fileNodes = ImmutableList.builder();
+ public ImmutableList<Directory> treeToDirectories(TreeNode root) {
+ ImmutableList.Builder<Directory> directories = ImmutableList.builder();
for (TreeNode node : descendants(root)) {
- fileNodes.add(Preconditions.checkNotNull(fileNodeCache.get(node)));
+ if (!node.isLeaf()) {
+ directories.add(Preconditions.checkNotNull(directoryCache.get(node)));
+ }
}
- return fileNodes.build();
+ return directories.build();
}
/**
* Should only be used on digests created by a call to computeMerkleDigests. Looks up ActionInputs
- * or FileNodes by cached digests and adds them to the lists.
+ * or Directory messages by cached digests and adds them to the lists.
*/
public void getDataFromDigests(
- Iterable<ContentDigest> digests, List<ActionInput> actionInputs, List<FileNode> nodes) {
- for (ContentDigest digest : digests) {
+ Iterable<Digest> digests, List<ActionInput> actionInputs, List<Directory> nodes) {
+ for (Digest digest : digests) {
TreeNode treeNode = digestTreeNodeCache.get(digest);
if (treeNode != null) {
- nodes.add(Preconditions.checkNotNull(fileNodeCache.get(treeNode)));
- } else {
- // If not there, it must be an ActionInput.
- ByteString hexDigest = ByteString.copyFromUtf8(ContentDigests.toHexString(digest));
+ nodes.add(Preconditions.checkNotNull(directoryCache.get(treeNode)));
+ } else { // If not there, it must be an ActionInput.
+ ByteString hexDigest = ByteString.copyFromUtf8(digest.getHash());
ActionInput input = inputFileCache.getInputFromDigest(hexDigest);
if (input == null) {
// ... or a VirtualActionInput.
diff --git a/src/main/protobuf/BUILD b/src/main/protobuf/BUILD
index 96a9394a04..d08393d17e 100644
--- a/src/main/protobuf/BUILD
+++ b/src/main/protobuf/BUILD
@@ -22,7 +22,6 @@ FILES = [
"android_deploy_info",
"apk_manifest",
"command_server",
- "remote_protocol",
]
[proto_library(
@@ -59,13 +58,6 @@ cc_grpc_library(
src = "command_server.proto",
)
-# TODO(olaola): add Golang support.
-java_grpc_library(
- name = "remote_protocol_java_grpc",
- srcs = [":remote_protocol_proto"],
- deps = [":remote_protocol_java_proto"],
-)
-
py_proto_library(
name = "build_pb_py",
srcs = ["build.proto"],
@@ -82,6 +74,5 @@ filegroup(
name = "dist_jars",
srcs = [s + "_java_proto_srcs" for s in FILES] + [
":command_server_java_grpc_srcs",
- ":remote_protocol_java_grpc_srcs",
],
)
diff --git a/src/main/protobuf/remote_protocol.proto b/src/main/protobuf/remote_protocol.proto
deleted file mode 100644
index 2a68643d1e..0000000000
--- a/src/main/protobuf/remote_protocol.proto
+++ /dev/null
@@ -1,329 +0,0 @@
-// Copyright 2015 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-package google.devtools.remote_execution;
-
-option java_package = "com.google.devtools.build.lib.remote";
-
-message ContentDigest {
- bytes digest = 1; // A hash of the contents of the file, blob, or tree node
- // (see below). The contents are digested using SHA1 (20 bytes).
- int64 size_bytes = 2;
- int32 version = 3; // Will be 0 by default, and increase if we ever change
- // the hash function used, etc.
- // And maybe other metadata later.
-}
-
-message FileMetadata {
- ContentDigest digest = 1;
- bool executable = 3;
-}
-
-// A Platform is defined by a set of opaque name-value pairs.
-message Platform {
- message Property {
- string name = 1;
- string value = 2;
- }
- // Entries are sorted by name,value to ensure a canonical form.
- repeated Property entry = 1;
-}
-
-// All the fields of this message have the following in common: they are
-// expected to define a result. All instructions to remote execution that do
-// not affect the result should be out of this message, because this message
-// will be used as the Execution Cache key.
-message Action {
- // The digest of the command (as an encoded proto blob).
- // Digested separately, because it is both long and reusable.
- ContentDigest command_digest = 1;
- // The Merkle digest of the root input directory.
- ContentDigest input_root_digest = 2;
- // Relative paths to the action's output files and directories.
- // The standard output and error stream contents are always returned
- // in addition (see ActionResult message).
- // Any unnamed output files will be discarded.
- // A path will be interpreted as a directory iff it contains a trailing
- // slash.
- // This should be sorted!
- repeated string output_path = 4;
- Platform platform = 5;
- // All other requirements that affect the result of the action should be
- // here as well.
-}
-
-message Command {
- // The argument vector. argv[0] is the binary.
- repeated string argv = 1;
- // A map of environment variables.
- // This is part of the command rather than Platform, because it does not
- // affect scheduling.
- message EnvironmentEntry {
- string variable = 1;
- string value = 2;
- }
- // Environment variables, sorted by variable.
- repeated EnvironmentEntry environment = 2;
- // Possibly other fields the describe the setup actions of the command.
-}
-
-message Output {
- string path = 1;
- // Yes, we shouldn't have to repeat paths here, but it should not be too
- // costly, and might help.
- // The actual output bodies will be stored in CAS.
- oneof content {
- // Populated for directory outputs only, contains the root FileNode digest.
- ContentDigest digest = 2;
- // Populated for file outputs only.
- FileMetadata file_metadata = 3;
- }
-}
-
-message ActionResult {
- // Relative paths to the action's output files and directories.
- // Any unnamed output files will be discarded.
- // A path will be interpreted as a directory iff it contains a trailing
- // slash.
- // This should be sorted by path.
- repeated Output output = 1;
- int32 return_code = 2;
- ContentDigest stdout_digest = 3;
- ContentDigest stderr_digest = 4;
-}
-
-// Status message shared by all CAS requests.
-message CasStatus {
- bool succeeded = 1; // Whether the requested action had server side errors
- // or not.
- enum ErrorCode {
- UNKNOWN = 0;
- INVALID_ARGUMENT = 1; // The client behaved incorrectly. error_detail should
- // have more information.
- MISSING_DIGEST = 2; // Missing a node on tree download.
- DIGEST_MISMATCH = 3; // Upload only error, when requested digest does not
- // match the server side computed one.
- NODE_PARSE_ERROR = 4; // Failed to parse digested data as node.
- // more errors...
- }
- ErrorCode error = 2;
- string error_detail = 3; // Human readable error.
- // These are a common part of the status for many CAS requests:
- repeated ContentDigest missing_digest = 4;
- repeated ContentDigest parse_failed_digest = 5; // Only relevant to trees.
-}
-
-service CasService {
- // Looks up given content keys in CAS, and returns success when found.
- // The single returned status will have the potentially missing digests,
- // which need to be re-uploaded.
- rpc Lookup(CasLookupRequest) returns (CasLookupReply) { }
- // Uploads a directory tree into CAS. Not streamed, because it is only tree
- // metadata.
- rpc UploadTreeMetadata(CasUploadTreeMetadataRequest) returns
- (CasUploadTreeMetadataReply) { }
- // Uploads data blob(s) into CAS.
- rpc UploadBlob(stream CasUploadBlobRequest) returns (CasUploadBlobReply) { }
- // Downoads a directory tree metadata from CAS.
- rpc DownloadTreeMetadata(CasDownloadTreeMetadataRequest) returns
- (CasDownloadTreeMetadataReply) { }
- // Downoads a directory tree from CAS. Returns the entire root directory.
- rpc DownloadTree(CasDownloadTreeRequest) returns (stream CasDownloadReply) { }
- // Downoads data blob(s) from CAS, returns them.
- rpc DownloadBlob(CasDownloadBlobRequest) returns (stream CasDownloadReply) { }
-}
-
-message FileNode {
- FileMetadata file_metadata = 1;
- message Child {
- string path = 1;
- ContentDigest digest = 2;
- }
- // The children should be sorted by path, and not have equal subdirectory
- // prefixes.
- repeated Child child = 2;
-}
-
-message CasLookupRequest {
- repeated ContentDigest digest = 1;
-}
-
-message CasLookupReply {
- CasStatus status = 1;
-}
-
-message CasUploadTreeMetadataRequest {
- repeated FileNode tree_node = 1;
-}
-
-message CasUploadTreeMetadataReply {
- CasStatus status = 1;
-}
-
-message CasDownloadTreeMetadataRequest {
- ContentDigest root = 1;
-}
-
-message CasDownloadTreeMetadataReply {
- CasStatus status = 1;
- repeated FileNode tree_node = 2;
-}
-
-message BlobChunk {
- ContentDigest digest = 1; // Present only in first chunk.
- int64 offset = 2;
- bytes data = 3;
-}
-
-// This will be used for batching files/blobs.
-message CasUploadBlobRequest {
- BlobChunk data = 1;
-}
-
-message CasUploadBlobReply {
- CasStatus status = 1;
-}
-
-message CasDownloadTreeRequest {
- ContentDigest root_digest = 1;
-}
-
-// This message is streamed.
-message CasDownloadReply {
- CasStatus status = 1;
- BlobChunk data = 2;
- // For trees, data is the entire root directory, zipped (for a single root).
- // For blobs, sequential chunks of multiple blobs.
-}
-
-message CasDownloadBlobRequest {
- repeated ContentDigest digest = 1;
-}
-
-service ExecutionCacheService {
- // Gets results of a cached action.
- rpc GetCachedResult(ExecutionCacheRequest) returns (ExecutionCacheReply) { }
- // Set results of a cached action. This requires reproducible builds on
- // connected machines!
- rpc SetCachedResult(ExecutionCacheSetRequest) returns
- (ExecutionCacheSetReply) {}
-}
-
-message ExecutionCacheRequest {
- ContentDigest action_digest = 1;
-}
-
-message ExecutionCacheStatus {
- // Whether the request had any server side errors. For a lookup (get result)
- // request, a true value means the result was found in the cache.
- bool succeeded = 1;
- enum ErrorCode {
- UNKNOWN = 0;
- MISSING_RESULT = 1; // The result was not found in the execution cache.
- UNSUPPORTED = 2; // The request is not supported by the server.
- // More server errors...
- }
- ErrorCode error = 2;
- string error_detail = 3; // Human readable error.
-}
-
-message ExecutionCacheReply {
- ExecutionCacheStatus status = 1;
- ActionResult result = 2;
-}
-
-message ExecutionCacheSetRequest {
- ContentDigest action_digest = 1;
- ActionResult result = 2;
-}
-
-message ExecutionCacheSetReply {
- ExecutionCacheStatus status = 1;
-}
-
-service ExecuteService {
- // Executes an action remotely.
- rpc Execute(ExecuteRequest) returns (stream ExecuteReply) { }
-}
-
-message ExecuteRequest {
- Action action = 1;
- bool accept_cached = 2;
- // Later will probably add previous attempt history, as it will be
- // useful for monitoring and probably scheduling as well.
- // These fields will be useful for scheduling, error reporting (e.g. disk
- // exceeded) and for log analysis.
- int32 total_input_file_count = 3;
- int64 total_input_file_bytes = 4;
- // Used for monitoring and scheduling.
- BuildInfo build_info = 5;
- // Timeout milliseconds for running this action.
- int64 timeout_millis = 6; // Maybe add io_timeout as well, per
- // Marc Antoine's suggestion.
- // Add other fields such as required cores, RAM, etc, that affect scheduling,
- // but not result, later. All the requirements that DO affect results should
- // be part of the Action.
-}
-
-message BuildInfo {
- string build_id = 1;
- // TBD, Fields used to identify a build action.
- // This will be useful for analysis purposes.
- // Note: we don't want to put any Bazel-specific fields in this.
-}
-
-message ExecutionStats {
-// TBD, will contain all the stats related to the execution:
-// time it took, resources, etc. Maybe will break into sub-messages
-// for various execution phases.
-}
-
-message ExecuteReply {
- ExecutionStatus status = 1;
- ActionResult result = 2;
- bool cached_result = 3; // Filled by the server on Execution Cache hit.
- ExecutionStats execution_stats = 4;
- CasStatus cas_error = 5; // A possible server-side CAS error, e.g. missing
- // inputs. The message contains the missing digests.
- // Later will introduce return AttemptHistory for monitoring and use
- // in requests.
-}
-
-message ExecutionStatus {
- bool executed = 1; // Whether the action was executed.
- bool succeeded = 2; // Whether the action succeeded.
- enum ErrorCode {
- UNKNOWN_ERROR = 0;
- MISSING_COMMAND = 1; // Missing command digest in CAS.
- MISSING_INPUT = 2; // Missing one of the inputs in CAS.
- DEADLINE_EXCEEDED = 3;
- EXEC_FAILED = 4; // Action returned non-zero.
- // Other server errors. Some of these errors are client-retriable, and some
- // not; will have to comment clearly what will happen on each error.
- }
- ErrorCode error = 3;
- string error_detail = 4;
- // These fields allow returning streaming statuses for the action progress.
- enum ActionStage {
- UNKNOWN_STAGE = 0;
- QUEUED = 1;
- EXECUTING = 2;
- FINISHED = 3;
- }
- ActionStage stage = 5;
- // Optionally will add more details pertaining to current stage, for example
- // time executing, or position in queue, etc.
-}
diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD
index 24e0835694..87c71572c2 100644
--- a/src/test/java/com/google/devtools/build/lib/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/BUILD
@@ -1068,8 +1068,6 @@ java_test(
"//src/main/java/com/google/devtools/build/lib/actions",
"//src/main/java/com/google/devtools/build/lib/remote",
"//src/main/java/com/google/devtools/common/options",
- "//src/main/protobuf:remote_protocol_java_grpc",
- "//src/main/protobuf:remote_protocol_java_proto",
"//third_party:api_client",
"//third_party:guava",
"//third_party:junit4",
@@ -1077,6 +1075,13 @@ java_test(
"//third_party:truth",
"//third_party/grpc:grpc-jar",
"//third_party/protobuf:protobuf_java",
+ "@googleapis//:google_bytestream_bytestream_java_grpc",
+ "@googleapis//:google_bytestream_bytestream_java_proto",
+ "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc",
+ "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto",
+ "@googleapis//:google_longrunning_operations_java_proto",
+ "@googleapis//:google_rpc_code_java_proto",
+ "@googleapis//:google_rpc_status_java_proto",
],
)
diff --git a/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java
index c7edee6ed0..ce445f7d4e 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java
@@ -14,7 +14,9 @@
package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -35,9 +37,7 @@ import com.google.devtools.build.lib.exec.SpawnRunner;
import com.google.devtools.build.lib.exec.SpawnRunner.ProgressStatus;
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionPolicy;
import com.google.devtools.build.lib.exec.util.FakeOwner;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
@@ -45,9 +45,10 @@ import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.devtools.common.options.Options;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.protobuf.ByteString;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.SortedMap;
import org.junit.Before;
@@ -55,110 +56,99 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.MockitoAnnotations.Mock;
/** Tests for {@link CachedLocalSpawnRunner}. */
@RunWith(JUnit4.class)
public class CachedLocalSpawnRunnerTest {
- private static final ArtifactExpander SIMPLE_ARTIFACT_EXPANDER = new ArtifactExpander() {
- @Override
- public void expand(Artifact artifact, Collection<? super Artifact> output) {
- output.add(artifact);
- }
- };
-
- private static final ContentDigest DIGEST_FOR_EMPTY = ContentDigests.computeDigest(new byte[0]);
+ private static final ArtifactExpander SIMPLE_ARTIFACT_EXPANDER =
+ new ArtifactExpander() {
+ @Override
+ public void expand(Artifact artifact, Collection<? super Artifact> output) {
+ output.add(artifact);
+ }
+ };
private FileSystem fs;
private Path execRoot;
private SimpleSpawn simpleSpawn;
private FakeActionInputFileCache fakeFileCache;
-
+ @Mock private RemoteActionCache cache;
+ @Mock private SpawnRunner delegate;
+ CachedLocalSpawnRunner runner;
private FileOutErr outErr;
- private final SpawnExecutionPolicy simplePolicy = new SpawnExecutionPolicy() {
- @Override
- public void lockOutputFiles() throws InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ActionInputFileCache getActionInputFileCache() {
- return fakeFileCache;
- }
-
- @Override
- public long getTimeoutMillis() {
- return 0;
- }
-
- @Override
- public FileOutErr getFileOutErr() {
- return outErr;
- }
-
- @Override
- public SortedMap<PathFragment, ActionInput> getInputMapping() throws IOException {
- return new SpawnInputExpander(/*strict*/false)
- .getInputMapping(simpleSpawn, SIMPLE_ARTIFACT_EXPANDER, fakeFileCache, "workspace");
- }
-
- @Override
- public void report(ProgressStatus state) {
- // TODO(ulfjack): Test that the right calls are made.
- }
- };
+ private final SpawnExecutionPolicy simplePolicy =
+ new SpawnExecutionPolicy() {
+ @Override
+ public void lockOutputFiles() throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ActionInputFileCache getActionInputFileCache() {
+ return fakeFileCache;
+ }
+
+ @Override
+ public long getTimeoutMillis() {
+ return 0;
+ }
+
+ @Override
+ public FileOutErr getFileOutErr() {
+ return outErr;
+ }
+
+ @Override
+ public SortedMap<PathFragment, ActionInput> getInputMapping() throws IOException {
+ return new SpawnInputExpander(/*strict*/ false)
+ .getInputMapping(simpleSpawn, SIMPLE_ARTIFACT_EXPANDER, fakeFileCache, "workspace");
+ }
+
+ @Override
+ public void report(ProgressStatus state) {
+ // TODO(ulfjack): Test that the right calls are made.
+ }
+ };
@Before
public final void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
fs = new InMemoryFileSystem();
execRoot = fs.getPath("/exec/root");
FileSystemUtils.createDirectoryAndParents(execRoot);
fakeFileCache = new FakeActionInputFileCache(execRoot);
- simpleSpawn = new SimpleSpawn(
- new FakeOwner("Mnemonic", "Progress Message"),
- ImmutableList.of("/bin/echo", "Hi!"),
- ImmutableMap.of("VARIABLE", "value"),
- /*executionInfo=*/ImmutableMap.<String, String>of(),
- /*inputs=*/ImmutableList.of(ActionInputHelper.fromPath("input")),
- /*outputs=*/ImmutableList.<ActionInput>of(),
- ResourceSet.ZERO
- );
+ simpleSpawn =
+ new SimpleSpawn(
+ new FakeOwner("Mnemonic", "Progress Message"),
+ ImmutableList.of("/bin/echo", "Hi!"),
+ ImmutableMap.of("VARIABLE", "value"),
+ /*executionInfo=*/ ImmutableMap.<String, String>of(),
+ /*inputs=*/ ImmutableList.of(ActionInputHelper.fromPath("input")),
+ /*outputs=*/ ImmutableList.<ActionInput>of(),
+ ResourceSet.ZERO);
Path stdout = fs.getPath("/tmp/stdout");
Path stderr = fs.getPath("/tmp/stderr");
FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory());
FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory());
outErr = new FileOutErr(stdout, stderr);
- }
-
- private void scratch(ActionInput input, String content) throws IOException {
- Path inputFile = execRoot.getRelative(input.getExecPath());
- FileSystemUtils.writeContentAsLatin1(inputFile, content);
- fakeFileCache.setDigest(
- simpleSpawn.getInputFiles().get(0), ByteString.copyFrom(inputFile.getSHA1Digest()));
+ RemoteOptions options = Options.getDefaults(RemoteOptions.class);
+ runner = new CachedLocalSpawnRunner(execRoot, options, cache, delegate);
+ fakeFileCache.createScratchInput(simpleSpawn.getInputFiles().get(0), "xyz");
}
@SuppressWarnings("unchecked")
@Test
public void cacheHit() throws Exception {
- RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- RemoteActionCache cache = Mockito.mock(RemoteActionCache.class);
- SpawnRunner delegate = Mockito.mock(SpawnRunner.class);
- CachedLocalSpawnRunner runner =
- new CachedLocalSpawnRunner(execRoot, options, cache, delegate);
- when(cache.getCachedActionResult(any(ActionKey.class)))
- .thenReturn(ActionResult.newBuilder().setReturnCode(0).build());
- when(cache.downloadBlobs(any(Iterable.class)))
- .thenReturn(ImmutableList.of(new byte[0], new byte[0]));
-
- scratch(simpleSpawn.getInputFiles().get(0), "xyz");
+ ActionResult actionResult = ActionResult.getDefaultInstance();
+ when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(actionResult);
SpawnResult result = runner.exec(simpleSpawn, simplePolicy);
- // We use verify to check that each method is called exactly once.
- // TODO(ulfjack): Check that we also call it with exactly the right parameters, not just any.
- verify(cache).getCachedActionResult(any(ActionKey.class));
- verify(cache).downloadAllResults(any(ActionResult.class), any(Path.class));
- verify(cache).downloadBlobs(any(Iterable.class));
+ // In line with state-based testing, we only verify calls that produce side effects.
+ verify(cache).downloadAllResults(actionResult, execRoot);
assertThat(result.setupSuccess()).isTrue();
assertThat(result.exitCode()).isEqualTo(0);
assertThat(outErr.hasRecordedOutput()).isFalse();
@@ -168,34 +158,42 @@ public class CachedLocalSpawnRunnerTest {
@SuppressWarnings("unchecked")
@Test
public void cacheHitWithOutput() throws Exception {
- RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- RemoteActionCache cache = Mockito.mock(RemoteActionCache.class);
- SpawnRunner delegate = Mockito.mock(SpawnRunner.class);
- CachedLocalSpawnRunner runner =
- new CachedLocalSpawnRunner(execRoot, options, cache, delegate);
- when(cache.getCachedActionResult(any(ActionKey.class)))
- .thenReturn(ActionResult.newBuilder().setReturnCode(0).build());
-
- scratch(simpleSpawn.getInputFiles().get(0), "xyz");
- byte[] cacheStdOut = "stdout".getBytes(StandardCharsets.UTF_8);
- byte[] cacheStdErr = "stderr".getBytes(StandardCharsets.UTF_8);
- ContentDigest stdOutDigest = ContentDigests.computeDigest(cacheStdOut);
- ContentDigest stdErrDigest = ContentDigests.computeDigest(cacheStdErr);
-
- ActionResult actionResult = ActionResult.newBuilder()
- .setReturnCode(0)
- .setStdoutDigest(stdOutDigest)
- .setStderrDigest(stdErrDigest)
- .build();
+ byte[] cacheStdOut = "stdout".getBytes(UTF_8);
+ byte[] cacheStdErr = "stderr".getBytes(UTF_8);
+ Digest stdOutDigest = Digests.computeDigest(cacheStdOut);
+ Digest stdErrDigest = Digests.computeDigest(cacheStdErr);
+
+ ActionResult actionResult =
+ ActionResult.newBuilder()
+ .setStdoutDigest(stdOutDigest)
+ .setStderrDigest(stdErrDigest)
+ .build();
when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(actionResult);
- when(cache.downloadBlobs(any(Iterable.class)))
- .thenReturn(ImmutableList.of(cacheStdOut, cacheStdErr));
+ when(cache.downloadBlob(stdOutDigest)).thenReturn(cacheStdOut);
+ when(cache.downloadBlob(stdErrDigest)).thenReturn(cacheStdErr);
SpawnResult result = runner.exec(simpleSpawn, simplePolicy);
- // We use verify to check that each method is called exactly once.
- verify(cache).getCachedActionResult(any(ActionKey.class));
- verify(cache).downloadAllResults(any(ActionResult.class), any(Path.class));
- verify(cache).downloadBlobs(any(Iterable.class));
+ // In line with state-based testing, we only verify calls that produce side effects.
+ verify(cache).downloadAllResults(actionResult, execRoot);
+ assertThat(result.setupSuccess()).isTrue();
+ assertThat(result.exitCode()).isEqualTo(0);
+ assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
+ assertThat(outErr.errAsLatin1()).isEqualTo("stderr");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void cacheHitWithOutputsInlined() throws Exception {
+ ActionResult actionResult =
+ ActionResult.newBuilder()
+ .setStdoutRaw(ByteString.copyFromUtf8("stdout"))
+ .setStderrRaw(ByteString.copyFromUtf8("stderr"))
+ .build();
+ when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(actionResult);
+
+ SpawnResult result = runner.exec(simpleSpawn, simplePolicy);
+ // In line with state-based testing, we only verify calls that produce side effects.
+ verify(cache).downloadAllResults(actionResult, execRoot);
assertThat(result.setupSuccess()).isTrue();
assertThat(result.exitCode()).isEqualTo(0);
assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
@@ -205,33 +203,29 @@ public class CachedLocalSpawnRunnerTest {
@SuppressWarnings("unchecked")
@Test
public void cacheMiss() throws Exception {
- RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- RemoteActionCache cache = Mockito.mock(RemoteActionCache.class);
- SpawnRunner delegate = Mockito.mock(SpawnRunner.class);
- CachedLocalSpawnRunner runner =
- new CachedLocalSpawnRunner(execRoot, options, cache, delegate);
- when(cache.getCachedActionResult(any(ActionKey.class)))
- .thenReturn(ActionResult.newBuilder().setReturnCode(0).build());
-
- scratch(simpleSpawn.getInputFiles().get(0), "xyz");
-
- when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(null);
- when(cache.uploadFileContents(any(Path.class))).thenReturn(DIGEST_FOR_EMPTY);
- SpawnResult delegateResult = new SpawnResult.Builder()
- .setExitCode(0)
- .setStatus(Status.SUCCESS)
- .build();
+ byte[] cacheStdOut = "stdout".getBytes(UTF_8);
+ byte[] cacheStdErr = "stderr".getBytes(UTF_8);
+ Digest stdOutDigest = Digests.computeDigest(cacheStdOut);
+ Digest stdErrDigest = Digests.computeDigest(cacheStdErr);
+ when(cache.uploadFileContents(any(Path.class))).thenReturn(stdErrDigest, stdOutDigest);
+ SpawnResult delegateResult =
+ new SpawnResult.Builder().setExitCode(0).setStatus(Status.SUCCESS).build();
when(delegate.exec(any(Spawn.class), any(SpawnExecutionPolicy.class)))
.thenReturn(delegateResult);
SpawnResult result = runner.exec(simpleSpawn, simplePolicy);
+ assertThat(result.setupSuccess()).isTrue();
+ assertThat(result.exitCode()).isEqualTo(0);
// We use verify to check that each method is called the correct number of times.
verify(cache)
.uploadAllResults(any(Path.class), any(Collection.class), any(ActionResult.Builder.class));
// Two additional uploads for stdout and stderr.
verify(cache, Mockito.times(2)).uploadFileContents(any(Path.class));
- verify(cache).setCachedActionResult(any(ActionKey.class), any(ActionResult.class));
- assertThat(result.setupSuccess()).isTrue();
- assertThat(result.exitCode()).isEqualTo(0);
+ ActionResult actionResult =
+ ActionResult.newBuilder()
+ .setStdoutDigest(stdOutDigest)
+ .setStderrDigest(stdErrDigest)
+ .build();
+ verify(cache).setCachedActionResult(any(ActionKey.class), eq(actionResult));
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java
index 13be580204..b51b9d6259 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java
@@ -17,9 +17,8 @@ import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.collect.ImmutableSet;
-import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.protobuf.ByteString;
+import com.google.devtools.build.lib.remote.Chunker.Chunk;
+import com.google.devtools.remoteexecution.v1test.Digest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -28,37 +27,29 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class ChunkerTest {
- static BlobChunk buildChunk(long offset, String data) {
- return BlobChunk.newBuilder().setOffset(offset).setData(ByteString.copyFromUtf8(data)).build();
- }
-
- static BlobChunk buildChunk(ContentDigest digest, String data) {
- return BlobChunk.newBuilder().setDigest(digest).setData(ByteString.copyFromUtf8(data)).build();
- }
-
@Test
public void testChunker() throws Exception {
byte[] b1 = "abcdefg".getBytes(UTF_8);
byte[] b2 = "hij".getBytes(UTF_8);
byte[] b3 = "klmnopqrstuvwxyz".getBytes(UTF_8);
- ContentDigest d1 = ContentDigests.computeDigest(b1);
- ContentDigest d2 = ContentDigests.computeDigest(b2);
- ContentDigest d3 = ContentDigests.computeDigest(b3);
+ Digest d1 = Digests.computeDigest(b1);
+ Digest d2 = Digests.computeDigest(b2);
+ Digest d3 = Digests.computeDigest(b3);
Chunker c = new Chunker.Builder().chunkSize(5).addInput(b1).addInput(b2).addInput(b3).build();
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(d1, "abcde"));
+ assertThat(c.next()).isEqualTo(new Chunk(d1, "abcde".getBytes(UTF_8), 0));
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(5, "fg"));
+ assertThat(c.next()).isEqualTo(new Chunk(d1, "fg".getBytes(UTF_8), 5));
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(d2, "hij"));
+ assertThat(c.next()).isEqualTo(new Chunk(d2, "hij".getBytes(UTF_8), 0));
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(d3, "klmno"));
+ assertThat(c.next()).isEqualTo(new Chunk(d3, "klmno".getBytes(UTF_8), 0));
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(5, "pqrst"));
+ assertThat(c.next()).isEqualTo(new Chunk(d3, "pqrst".getBytes(UTF_8), 5));
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(10, "uvwxy"));
+ assertThat(c.next()).isEqualTo(new Chunk(d3, "uvwxy".getBytes(UTF_8), 10));
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(15, "z"));
+ assertThat(c.next()).isEqualTo(new Chunk(d3, "z".getBytes(UTF_8), 15));
assertThat(c.hasNext()).isFalse();
}
@@ -68,8 +59,8 @@ public class ChunkerTest {
byte[] b2 = "bb".getBytes(UTF_8);
byte[] b3 = "ccc".getBytes(UTF_8);
byte[] b4 = "dddd".getBytes(UTF_8);
- ContentDigest d1 = ContentDigests.computeDigest(b1);
- ContentDigest d3 = ContentDigests.computeDigest(b3);
+ Digest d1 = Digests.computeDigest(b1);
+ Digest d3 = Digests.computeDigest(b3);
Chunker c =
new Chunker.Builder()
.chunkSize(2)
@@ -80,11 +71,11 @@ public class ChunkerTest {
.addInput(b4)
.build();
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(d1, "a"));
+ assertThat(c.next()).isEqualTo(new Chunk(d1, "a".getBytes(UTF_8), 0));
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(d3, "cc"));
+ assertThat(c.next()).isEqualTo(new Chunk(d3, "cc".getBytes(UTF_8), 0));
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(2, "c"));
+ assertThat(c.next()).isEqualTo(new Chunk(d3, "c".getBytes(UTF_8), 2));
assertThat(c.hasNext()).isFalse();
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java b/src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java
index 74994aabdd..3193e0b66f 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java
@@ -20,7 +20,9 @@ import com.google.common.hash.HashCode;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.Artifact;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.protobuf.ByteString;
import java.io.IOException;
import javax.annotation.Nullable;
@@ -28,20 +30,21 @@ import javax.annotation.Nullable;
/** A fake implementation of the {@link ActionInputFileCache} interface. */
final class FakeActionInputFileCache implements ActionInputFileCache {
private final Path execRoot;
- private final BiMap<ActionInput, ByteString> cas = HashBiMap.create();
+ private final BiMap<ActionInput, String> cas = HashBiMap.create();
FakeActionInputFileCache(Path execRoot) {
this.execRoot = execRoot;
}
- void setDigest(ActionInput input, ByteString digest) {
+ void setDigest(ActionInput input, String digest) {
cas.put(input, digest);
}
@Override
@Nullable
public byte[] getDigest(ActionInput input) throws IOException {
- return Preconditions.checkNotNull(cas.get(input), input).toByteArray();
+ String hexDigest = Preconditions.checkNotNull(cas.get(input), input);
+ return HashCode.fromString(hexDigest).asBytes();
}
@Override
@@ -63,12 +66,20 @@ final class FakeActionInputFileCache implements ActionInputFileCache {
@Nullable
public ActionInput getInputFromDigest(ByteString hexDigest) {
HashCode code = HashCode.fromString(hexDigest.toStringUtf8());
- ByteString digest = ByteString.copyFrom(code.asBytes());
- return Preconditions.checkNotNull(cas.inverse().get(digest));
+ return Preconditions.checkNotNull(cas.inverse().get(code.toString()));
}
@Override
public Path getInputPath(ActionInput input) {
throw new UnsupportedOperationException();
}
+
+ public Digest createScratchInput(ActionInput input, String content) throws IOException {
+ Path inputFile = execRoot.getRelative(input.getExecPath());
+ FileSystemUtils.createDirectoryAndParents(inputFile.getParentDirectory());
+ FileSystemUtils.writeContentAsLatin1(inputFile, content);
+ Digest digest = Digests.computeDigest(inputFile);
+ setDigest(input, digest.getHash());
+ return digest;
+ }
} \ No newline at end of file
diff --git a/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java b/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java
new file mode 100644
index 0000000000..4de1498207
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java
@@ -0,0 +1,56 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
+import com.google.bytestream.ByteStreamProto.ReadRequest;
+import com.google.bytestream.ByteStreamProto.ReadResponse;
+import com.google.common.collect.ImmutableMap;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import java.util.Map;
+
+class FakeImmutableCacheByteStreamImpl extends ByteStreamImplBase {
+ private final Map<ReadRequest, ReadResponse> cannedReplies;
+
+ public FakeImmutableCacheByteStreamImpl(Map<Digest, String> contents) {
+ ImmutableMap.Builder<ReadRequest, ReadResponse> b = ImmutableMap.builder();
+ for (Map.Entry<Digest, String> e : contents.entrySet()) {
+ b.put(
+ ReadRequest.newBuilder()
+ .setResourceName("blobs/" + e.getKey().getHash() + "/" + e.getKey().getSizeBytes())
+ .build(),
+ ReadResponse.newBuilder().setData(ByteString.copyFromUtf8(e.getValue())).build());
+ }
+ cannedReplies = b.build();
+ }
+
+ public FakeImmutableCacheByteStreamImpl(Digest digest, String contents) {
+ this(ImmutableMap.of(digest, contents));
+ }
+
+ public FakeImmutableCacheByteStreamImpl(Digest d1, String c1, Digest d2, String c2) {
+ this(ImmutableMap.of(d1, c1, d2, c2));
+ }
+
+ @Override
+ public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+ assertThat(cannedReplies.containsKey(request)).isTrue();
+ responseObserver.onNext(cannedReplies.get(request));
+ responseObserver.onCompleted();
+ }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java
index c72e819820..3cb2bd218c 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java
@@ -15,78 +15,81 @@ package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
import com.google.api.client.json.GenericJson;
import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
+import com.google.bytestream.ByteStreamProto.ReadRequest;
+import com.google.bytestream.ByteStreamProto.ReadResponse;
+import com.google.bytestream.ByteStreamProto.WriteRequest;
+import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.devtools.build.lib.actions.Root;
+import com.google.devtools.build.lib.actions.ActionInputHelper;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceImplBase;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
import com.google.devtools.build.lib.testutil.Scratch;
-import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.build.lib.vfs.FileSystem;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.devtools.common.options.Options;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
-import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
+import io.grpc.util.MutableHandlerRegistry;
import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import org.mockito.MockitoAnnotations;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
/** Tests for {@link GrpcActionCache}. */
@RunWith(JUnit4.class)
public class GrpcActionCacheTest {
- private final FakeRemoteCacheService fakeRemoteCacheService = new FakeRemoteCacheService();
-
- private final Server server =
- InProcessServerBuilder.forName(getClass().getSimpleName())
- .directExecutor()
- .addService(fakeRemoteCacheService)
- .build();
-
- private final ManagedChannel channel =
- InProcessChannelBuilder.forName(getClass().getSimpleName()).directExecutor().build();
- private Scratch scratch;
- private Root rootDir;
+ private FileSystem fs;
+ private Path execRoot;
+ private FakeActionInputFileCache fakeFileCache;
+ private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
+ private final String fakeServerName = "fake server for " + getClass();
+ private Server fakeServer;
@Before
public final void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
- scratch = new Scratch();
- rootDir = Root.asDerivedRoot(scratch.dir("/exec/root"));
- server.start();
+ // Use a mutable service registry for later registering the service impl for each test case.
+ fakeServer =
+ InProcessServerBuilder.forName(fakeServerName)
+ .fallbackHandlerRegistry(serviceRegistry)
+ .directExecutor()
+ .build()
+ .start();
+ Chunker.setDefaultChunkSizeForTesting(1000); // Enough for everything to be one chunk.
+ fs = new InMemoryFileSystem();
+ execRoot = fs.getPath("/exec/root");
+ FileSystemUtils.createDirectoryAndParents(execRoot);
+ fakeFileCache = new FakeActionInputFileCache(execRoot);
}
@After
- public void tearDown() {
- server.shutdownNow();
- channel.shutdownNow();
+ public void tearDown() throws Exception {
+ fakeServer.shutdownNow();
}
private static class ChannelOptionsInterceptor implements ClientInterceptor {
@@ -106,316 +109,332 @@ public class GrpcActionCacheTest {
}
private GrpcActionCache newClient() throws IOException {
- return newClient(Options.getDefaults(RemoteOptions.class),
- Options.getDefaults(AuthAndTLSOptions.class));
- }
+ AuthAndTLSOptions authTlsOptions = Options.getDefaults(AuthAndTLSOptions.class);
+ authTlsOptions.authEnabled = true;
+ authTlsOptions.authCredentials = "/exec/root/creds.json";
+ authTlsOptions.authScope = "dummy.scope";
+
+ GenericJson json = new GenericJson();
+ json.put("type", "authorized_user");
+ json.put("client_id", "some_client");
+ json.put("client_secret", "foo");
+ json.put("refresh_token", "bar");
+ Scratch scratch = new Scratch();
+ scratch.file(authTlsOptions.authCredentials, new JacksonFactory().toString(json));
- private GrpcActionCache newClient(RemoteOptions remoteOptions, AuthAndTLSOptions authTlsOptions)
- throws IOException {
ChannelOptions channelOptions =
- authTlsOptions.authCredentials != null
- ? ChannelOptions.create(
- authTlsOptions, remoteOptions.grpcMaxChunkSizeBytes,
- scratch.resolve(authTlsOptions.authCredentials).getInputStream())
- : ChannelOptions.create(authTlsOptions, remoteOptions.grpcMaxChunkSizeBytes);
+ ChannelOptions.create(
+ authTlsOptions, scratch.resolve(authTlsOptions.authCredentials).getInputStream());
return new GrpcActionCache(
ClientInterceptors.intercept(
- channel, ImmutableList.of(new ChannelOptionsInterceptor(channelOptions))),
- remoteOptions,
- channelOptions);
+ InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(),
+ ImmutableList.of(new ChannelOptionsInterceptor(channelOptions))),
+ channelOptions,
+ Options.getDefaults(RemoteOptions.class));
}
@Test
- public void testDownloadEmptyBlobs() throws Exception {
+ public void testDownloadEmptyBlob() throws Exception {
GrpcActionCache client = newClient();
- ContentDigest fooDigest = fakeRemoteCacheService.put("foo".getBytes(UTF_8));
- ContentDigest emptyDigest = ContentDigests.computeDigest(new byte[0]);
- ImmutableList<byte[]> results =
- client.downloadBlobs(ImmutableList.<ContentDigest>of(emptyDigest, fooDigest, emptyDigest));
- // Will not query the server for empty blobs.
- assertThat(new String(results.get(0), UTF_8)).isEmpty();
- assertThat(new String(results.get(1), UTF_8)).isEqualTo("foo");
- assertThat(new String(results.get(2), UTF_8)).isEmpty();
- // Will not call the server at all.
- assertThat(new String(client.downloadBlob(emptyDigest), UTF_8)).isEmpty();
+ Digest emptyDigest = Digests.computeDigest(new byte[0]);
+ // Will not call the mock Bytestream interface at all.
+ assertThat(client.downloadBlob(emptyDigest)).isEmpty();
}
@Test
- public void testDownloadBlobs() throws Exception {
- GrpcActionCache client = newClient();
- ContentDigest fooDigest = fakeRemoteCacheService.put("foo".getBytes(UTF_8));
- ContentDigest barDigest = fakeRemoteCacheService.put("bar".getBytes(UTF_8));
- ImmutableList<byte[]> results =
- client.downloadBlobs(ImmutableList.<ContentDigest>of(fooDigest, barDigest));
- assertThat(new String(results.get(0), UTF_8)).isEqualTo("foo");
- assertThat(new String(results.get(1), UTF_8)).isEqualTo("bar");
+ public void testDownloadBlobSingleChunk() throws Exception {
+ final GrpcActionCache client = newClient();
+ final Digest digest = Digests.computeDigestUtf8("abcdefg");
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+ assertThat(request.getResourceName().contains(digest.getHash())).isTrue();
+ responseObserver.onNext(
+ ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abcdefg")).build());
+ responseObserver.onCompleted();
+ }
+ });
+ assertThat(new String(client.downloadBlob(digest), UTF_8)).isEqualTo("abcdefg");
}
@Test
- public void testDownloadBlobsBatchChunk() throws Exception {
- RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- options.grpcMaxBatchInputs = 10;
- options.grpcMaxChunkSizeBytes = 2;
- options.grpcMaxBatchSizeBytes = 10;
- options.remoteTimeout = 10;
- GrpcActionCache client = newClient(options, Options.getDefaults(AuthAndTLSOptions.class));
- ContentDigest fooDigest = fakeRemoteCacheService.put("fooooooo".getBytes(UTF_8));
- ContentDigest barDigest = fakeRemoteCacheService.put("baaaar".getBytes(UTF_8));
- ContentDigest s1Digest = fakeRemoteCacheService.put("1".getBytes(UTF_8));
- ContentDigest s2Digest = fakeRemoteCacheService.put("2".getBytes(UTF_8));
- ContentDigest s3Digest = fakeRemoteCacheService.put("3".getBytes(UTF_8));
- ImmutableList<byte[]> results =
- client.downloadBlobs(
- ImmutableList.<ContentDigest>of(fooDigest, barDigest, s1Digest, s2Digest, s3Digest));
- assertThat(new String(results.get(0), UTF_8)).isEqualTo("fooooooo");
- assertThat(new String(results.get(1), UTF_8)).isEqualTo("baaaar");
- assertThat(new String(results.get(2), UTF_8)).isEqualTo("1");
- assertThat(new String(results.get(3), UTF_8)).isEqualTo("2");
- assertThat(new String(results.get(4), UTF_8)).isEqualTo("3");
+ public void testDownloadBlobMultipleChunks() throws Exception {
+ final GrpcActionCache client = newClient();
+ final Digest digest = Digests.computeDigestUtf8("abcdefg");
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+ assertThat(request.getResourceName().contains(digest.getHash())).isTrue();
+ responseObserver.onNext(
+ ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abc")).build());
+ responseObserver.onNext(
+ ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("def")).build());
+ responseObserver.onNext(
+ ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("g")).build());
+ responseObserver.onCompleted();
+ }
+ });
+ assertThat(new String(client.downloadBlob(digest), UTF_8)).isEqualTo("abcdefg");
}
@Test
- public void testUploadBlobs() throws Exception {
+ public void testDownloadAllResults() throws Exception {
GrpcActionCache client = newClient();
- byte[] foo = "foo".getBytes(UTF_8);
- byte[] bar = "bar".getBytes(UTF_8);
- ContentDigest fooDigest = ContentDigests.computeDigest(foo);
- ContentDigest barDigest = ContentDigests.computeDigest(bar);
- ImmutableList<ContentDigest> digests = client.uploadBlobs(ImmutableList.<byte[]>of(foo, bar));
- assertThat(digests).containsExactly(fooDigest, barDigest);
- assertThat(fakeRemoteCacheService.get(fooDigest)).isEqualTo(foo);
- assertThat(fakeRemoteCacheService.get(barDigest)).isEqualTo(bar);
- }
+ Digest fooDigest = Digests.computeDigestUtf8("foo-contents");
+ Digest barDigest = Digests.computeDigestUtf8("bar-contents");
+ Digest emptyDigest = Digests.computeDigest(new byte[0]);
+ serviceRegistry.addService(
+ new FakeImmutableCacheByteStreamImpl(fooDigest, "foo-contents", barDigest, "bar-contents"));
- @Test
- public void testUploadBlobsBatchChunk() throws Exception {
- RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- options.grpcMaxBatchInputs = 10;
- options.grpcMaxChunkSizeBytes = 2;
- options.grpcMaxBatchSizeBytes = 10;
- options.remoteTimeout = 10;
- GrpcActionCache client = newClient(options, Options.getDefaults(AuthAndTLSOptions.class));
-
- byte[] foo = "fooooooo".getBytes(UTF_8);
- byte[] bar = "baaaar".getBytes(UTF_8);
- byte[] s1 = "1".getBytes(UTF_8);
- byte[] s2 = "2".getBytes(UTF_8);
- byte[] s3 = "3".getBytes(UTF_8);
- ContentDigest fooDigest = ContentDigests.computeDigest(foo);
- ContentDigest barDigest = ContentDigests.computeDigest(bar);
- ContentDigest s1Digest = ContentDigests.computeDigest(s1);
- ContentDigest s2Digest = ContentDigests.computeDigest(s2);
- ContentDigest s3Digest = ContentDigests.computeDigest(s3);
- ImmutableList<ContentDigest> digests =
- client.uploadBlobs(ImmutableList.<byte[]>of(foo, bar, s1, s2, s3));
- assertThat(digests).containsExactly(fooDigest, barDigest, s1Digest, s2Digest, s3Digest);
- assertThat(fakeRemoteCacheService.get(fooDigest)).isEqualTo(foo);
- assertThat(fakeRemoteCacheService.get(barDigest)).isEqualTo(bar);
- assertThat(fakeRemoteCacheService.get(s1Digest)).isEqualTo(s1);
- assertThat(fakeRemoteCacheService.get(s2Digest)).isEqualTo(s2);
- assertThat(fakeRemoteCacheService.get(s3Digest)).isEqualTo(s3);
- }
-
- @Test
- public void testUploadAllResults() throws Exception {
- GrpcActionCache client = newClient();
- byte[] foo = "foo".getBytes(UTF_8);
- byte[] bar = "bar".getBytes(UTF_8);
- Path fooFile = scratch.file("/exec/root/a/foo", foo);
- Path emptyFile = scratch.file("/exec/root/b/empty");
- Path barFile = scratch.file("/exec/root/a/bar", bar);
- ContentDigest fooDigest = ContentDigests.computeDigest(fooFile);
- ContentDigest barDigest = ContentDigests.computeDigest(barFile);
- ContentDigest emptyDigest = ContentDigests.computeDigest(new byte[0]);
ActionResult.Builder result = ActionResult.newBuilder();
- client.uploadAllResults(
- rootDir.getPath(), ImmutableList.<Path>of(fooFile, emptyFile, barFile), result);
- assertThat(fakeRemoteCacheService.get(fooDigest)).isEqualTo(foo);
- assertThat(fakeRemoteCacheService.get(barDigest)).isEqualTo(bar);
- ActionResult.Builder expectedResult = ActionResult.newBuilder();
- expectedResult
- .addOutputBuilder()
- .setPath("a/foo")
- .getFileMetadataBuilder()
- .setDigest(fooDigest);
- expectedResult
- .addOutputBuilder()
- .setPath("b/empty")
- .getFileMetadataBuilder()
- .setDigest(emptyDigest);
- expectedResult
- .addOutputBuilder()
- .setPath("a/bar")
- .getFileMetadataBuilder()
- .setDigest(barDigest);
- assertThat(result.build()).isEqualTo(expectedResult.build());
+ result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
+ result.addOutputFilesBuilder().setPath("b/empty").setDigest(emptyDigest);
+ result.addOutputFilesBuilder().setPath("a/bar").setDigest(barDigest).setIsExecutable(true);
+ client.downloadAllResults(result.build(), execRoot);
+ assertThat(Digests.computeDigest(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest);
+ assertThat(Digests.computeDigest(execRoot.getRelative("b/empty"))).isEqualTo(emptyDigest);
+ assertThat(Digests.computeDigest(execRoot.getRelative("a/bar"))).isEqualTo(barDigest);
+ assertThat(execRoot.getRelative("a/bar").isExecutable()).isTrue();
}
@Test
- public void testDownloadAllResults() throws Exception {
- GrpcActionCache client = newClient();
- ContentDigest fooDigest = fakeRemoteCacheService.put("foo".getBytes(UTF_8));
- ContentDigest barDigest = fakeRemoteCacheService.put("bar".getBytes(UTF_8));
- ContentDigest emptyDigest = ContentDigests.computeDigest(new byte[0]);
- ActionResult.Builder result = ActionResult.newBuilder();
- result.addOutputBuilder().setPath("a/foo").getFileMetadataBuilder().setDigest(fooDigest);
- result.addOutputBuilder().setPath("b/empty").getFileMetadataBuilder().setDigest(emptyDigest);
- result.addOutputBuilder().setPath("a/bar").getFileMetadataBuilder().setDigest(barDigest);
- client.downloadAllResults(result.build(), rootDir.getPath());
- Path fooFile = rootDir.getPath().getRelative("a/foo");
- Path emptyFile = rootDir.getPath().getRelative("b/empty");
- Path barFile = rootDir.getPath().getRelative("a/bar");
- assertThat(ContentDigests.computeDigest(fooFile)).isEqualTo(fooDigest);
- assertThat(ContentDigests.computeDigest(emptyFile)).isEqualTo(emptyDigest);
- assertThat(ContentDigests.computeDigest(barFile)).isEqualTo(barDigest);
+ public void testUploadBlobCacheHit() throws Exception {
+ final GrpcActionCache client = newClient();
+ final Digest digest = Digests.computeDigestUtf8("abcdefg");
+ serviceRegistry.addService(
+ new ContentAddressableStorageImplBase() {
+ @Override
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ }
+ });
+ assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest);
}
@Test
- public void testAuthCredentials() throws Exception {
- AuthAndTLSOptions options = Options.getDefaults(AuthAndTLSOptions.class);
- options.authEnabled = true;
- options.authCredentials = "/exec/root/creds.json";
- options.authScope = "dummy.scope";
-
- GenericJson json = new GenericJson();
- json.put("type", "authorized_user");
- json.put("client_id", "some_client");
- json.put("client_secret", "foo");
- json.put("refresh_token", "bar");
- scratch.file(options.authCredentials, new JacksonFactory().toString(json));
-
- GrpcActionCache client = newClient(Options.getDefaults(RemoteOptions.class), options);
- byte[] foo = "foo".getBytes(UTF_8);
- ContentDigest fooDigest = ContentDigests.computeDigest(foo);
- ImmutableList<ContentDigest> digests = client.uploadBlobs(ImmutableList.<byte[]>of(foo));
- assertThat(digests).containsExactly(fooDigest);
- assertThat(fakeRemoteCacheService.get(fooDigest)).isEqualTo(foo);
+ public void testUploadBlobSingleChunk() throws Exception {
+ final GrpcActionCache client = newClient();
+ final Digest digest = Digests.computeDigestUtf8("abcdefg");
+ serviceRegistry.addService(
+ new ContentAddressableStorageImplBase() {
+ @Override
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ responseObserver.onNext(
+ FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(digest).build());
+ responseObserver.onCompleted();
+ }
+ });
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public StreamObserver<WriteRequest> write(
+ final StreamObserver<WriteResponse> responseObserver) {
+ return new StreamObserver<WriteRequest>() {
+ @Override
+ public void onNext(WriteRequest request) {
+ assertThat(request.getResourceName()).contains(digest.getHash());
+ assertThat(request.getFinishWrite()).isTrue();
+ assertThat(request.getData().toStringUtf8()).isEqualTo("abcdefg");
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onNext(WriteResponse.newBuilder().setCommittedSize(7).build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ fail("An error occurred: " + t);
+ }
+ };
+ }
+ });
+ assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest);
}
- private static class FakeRemoteCacheService extends CasServiceImplBase {
- private final ConcurrentMap<String, byte[]> cache = Maps.newConcurrentMap();
-
- public ContentDigest put(byte[] blob) {
- ContentDigest digest = ContentDigests.computeDigest(blob);
- cache.put(ContentDigests.toHexString(digest), blob);
- return digest;
+ static class TestChunkedRequestObserver implements StreamObserver<WriteRequest> {
+ private final StreamObserver<WriteResponse> responseObserver;
+ private final String contents;
+ private Chunker chunker;
+
+ public TestChunkedRequestObserver(
+ StreamObserver<WriteResponse> responseObserver, String contents, int chunkSizeBytes) {
+ this.responseObserver = responseObserver;
+ this.contents = contents;
+ try {
+ chunker = Chunker.from(contents.getBytes(UTF_8), chunkSizeBytes);
+ } catch (IOException e) {
+ fail("An error occurred:" + e);
+ }
}
- public byte[] get(ContentDigest digest) {
- return cache.get(ContentDigests.toHexString(digest));
+ @Override
+ public void onNext(WriteRequest request) {
+ assertThat(chunker.hasNext()).isTrue();
+ try {
+ Chunker.Chunk chunk = chunker.next();
+ Digest digest = chunk.getDigest();
+ long offset = chunk.getOffset();
+ byte[] data = chunk.getData();
+ if (offset == 0) {
+ assertThat(request.getResourceName()).contains(digest.getHash());
+ } else {
+ assertThat(request.getResourceName()).isEmpty();
+ }
+ assertThat(request.getFinishWrite())
+ .isEqualTo(offset + data.length == digest.getSizeBytes());
+ assertThat(request.getData().toByteArray()).isEqualTo(data);
+ } catch (IOException e) {
+ fail("An error occurred:" + e);
+ }
}
- public void clear() {
- cache.clear();
+ @Override
+ public void onCompleted() {
+ assertThat(chunker.hasNext()).isFalse();
+ responseObserver.onNext(
+ WriteResponse.newBuilder().setCommittedSize(contents.length()).build());
+ responseObserver.onCompleted();
}
@Override
- public void lookup(CasLookupRequest request, StreamObserver<CasLookupReply> observer) {
- CasLookupReply.Builder reply = CasLookupReply.newBuilder();
- CasStatus.Builder status = reply.getStatusBuilder();
- for (ContentDigest digest : request.getDigestList()) {
- if (get(digest) == null) {
- status.addMissingDigest(digest);
- }
- }
- status.setSucceeded(true);
- observer.onNext(reply.build());
- observer.onCompleted();
+ public void onError(Throwable t) {
+ fail("An error occurred: " + t);
}
+ }
- @Override
- public void downloadBlob(
- CasDownloadBlobRequest request, StreamObserver<CasDownloadReply> observer) {
- CasDownloadReply.Builder reply = CasDownloadReply.newBuilder();
- CasStatus.Builder status = reply.getStatusBuilder();
- boolean success = true;
- for (ContentDigest digest : request.getDigestList()) {
- if (get(digest) == null) {
- status.addMissingDigest(digest);
- success = false;
- }
- }
- if (!success) {
- status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
- status.setSucceeded(false);
- observer.onNext(reply.build());
- observer.onCompleted();
- return;
+ private Answer<StreamObserver<WriteRequest>> blobChunkedWriteAnswer(
+ final String contents, final int chunkSize) {
+ return new Answer<StreamObserver<WriteRequest>>() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public StreamObserver<WriteRequest> answer(InvocationOnMock invocation) {
+ return new TestChunkedRequestObserver(
+ (StreamObserver<WriteResponse>) invocation.getArguments()[0], contents, chunkSize);
}
- // We change the order on purpose, to test for blobs out of order:
- for (ContentDigest digest : Lists.reverse(request.getDigestList())) {
- observer.onNext(
- CasDownloadReply.newBuilder()
- .setStatus(CasStatus.newBuilder().setSucceeded(true))
- .setData(
- BlobChunk.newBuilder()
- .setDigest(digest)
- .setData(ByteString.copyFrom(get(digest))))
- .build());
- }
- observer.onCompleted();
+ };
+ }
+
+ @Test
+ public void testUploadBlobMultipleChunks() throws Exception {
+ final Digest digest = Digests.computeDigestUtf8("abcdef");
+ serviceRegistry.addService(
+ new ContentAddressableStorageImplBase() {
+ @Override
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ responseObserver.onNext(
+ FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(digest).build());
+ responseObserver.onCompleted();
+ }
+ });
+
+ ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class);
+ serviceRegistry.addService(mockByteStreamImpl);
+ for (int chunkSize = 1; chunkSize <= 6; ++chunkSize) {
+ GrpcActionCache client = newClient();
+ Chunker.setDefaultChunkSizeForTesting(chunkSize);
+ when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject()))
+ .thenAnswer(blobChunkedWriteAnswer("abcdef", chunkSize));
+ assertThat(client.uploadBlob("abcdef".getBytes(UTF_8))).isEqualTo(digest);
}
+ }
- @Override
- public StreamObserver<CasUploadBlobRequest> uploadBlob(
- final StreamObserver<CasUploadBlobReply> responseObserver) {
- return new StreamObserver<CasUploadBlobRequest>() {
- byte[] blob = null;
- ContentDigest digest = null;
- long offset = 0;
-
- @Override
- public void onNext(CasUploadBlobRequest request) {
- BlobChunk chunk = request.getData();
- try {
- if (chunk.hasDigest()) {
- // Check if the previous chunk was really done.
- Preconditions.checkArgument(
- digest == null || offset == 0,
- "Missing input chunk for digest %s",
- digest == null ? "" : ContentDigests.toString(digest));
- digest = chunk.getDigest();
- blob = new byte[(int) digest.getSizeBytes()];
- }
- Preconditions.checkArgument(digest != null, "First chunk contains no digest");
- Preconditions.checkArgument(
- offset == chunk.getOffset(),
- "Missing input chunk for digest %s",
- ContentDigests.toString(digest));
- if (digest.getSizeBytes() > 0) {
- chunk.getData().copyTo(blob, (int) offset);
- offset = (offset + chunk.getData().size()) % digest.getSizeBytes();
- }
- if (offset == 0) {
- ContentDigest uploadedDigest = put(blob);
- Preconditions.checkArgument(
- uploadedDigest.equals(digest),
- "Digest mismatch: client sent %s, server computed %s",
- ContentDigests.toString(digest),
- ContentDigests.toString(uploadedDigest));
- }
- } catch (Exception e) {
- CasUploadBlobReply.Builder reply = CasUploadBlobReply.newBuilder();
- reply
- .getStatusBuilder()
- .setSucceeded(false)
- .setError(
- e instanceof IllegalArgumentException
- ? CasStatus.ErrorCode.INVALID_ARGUMENT
- : CasStatus.ErrorCode.UNKNOWN)
- .setErrorDetail(e.toString());
- responseObserver.onNext(reply.build());
+ @Test
+ public void testUploadAllResultsCacheHits() throws Exception {
+ final GrpcActionCache client = newClient();
+ final Digest fooDigest =
+ fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
+ final Digest barDigest =
+ fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar"), "x");
+ final Path fooFile = execRoot.getRelative("a/foo");
+ final Path barFile = execRoot.getRelative("bar");
+ barFile.setExecutable(true);
+ serviceRegistry.addService(
+ new ContentAddressableStorageImplBase() {
+ @Override
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ assertThat(request)
+ .isEqualTo(
+ FindMissingBlobsRequest.newBuilder()
+ .addBlobDigests(fooDigest)
+ .addBlobDigests(barDigest)
+ .build());
+ // Nothing is missing.
+ responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
+ responseObserver.onCompleted();
}
- }
+ });
- @Override
- public void onError(Throwable t) {}
+ ActionResult.Builder result = ActionResult.newBuilder();
+ client.uploadAllResults(execRoot, ImmutableList.<Path>of(fooFile, barFile), result);
+ ActionResult.Builder expectedResult = ActionResult.newBuilder();
+ expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
+ expectedResult
+ .addOutputFilesBuilder()
+ .setPath("bar")
+ .setDigest(barDigest)
+ .setIsExecutable(true);
+ assertThat(result.build()).isEqualTo(expectedResult.build());
+ }
- @Override
- public void onCompleted() {
- responseObserver.onCompleted();
- }
- };
- }
+ @Test
+ public void testUploadAllResultsCacheMisses() throws Exception {
+ final GrpcActionCache client = newClient();
+ final Digest fooDigest =
+ fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
+ final Digest barDigest =
+ fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar"), "x");
+ final Path fooFile = execRoot.getRelative("a/foo");
+ final Path barFile = execRoot.getRelative("bar");
+ barFile.setExecutable(true);
+ serviceRegistry.addService(
+ new ContentAddressableStorageImplBase() {
+ @Override
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ assertThat(request)
+ .isEqualTo(
+ FindMissingBlobsRequest.newBuilder()
+ .addBlobDigests(fooDigest)
+ .addBlobDigests(barDigest)
+ .build());
+ // Both are missing.
+ responseObserver.onNext(
+ FindMissingBlobsResponse.newBuilder()
+ .addMissingBlobDigests(fooDigest)
+ .addMissingBlobDigests(barDigest)
+ .build());
+ responseObserver.onCompleted();
+ }
+ });
+ ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class);
+ serviceRegistry.addService(mockByteStreamImpl);
+ when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject()))
+ .thenAnswer(blobChunkedWriteAnswer("xyz", 3))
+ .thenAnswer(blobChunkedWriteAnswer("x", 1));
+
+ ActionResult.Builder result = ActionResult.newBuilder();
+ client.uploadAllResults(execRoot, ImmutableList.<Path>of(fooFile, barFile), result);
+ ActionResult.Builder expectedResult = ActionResult.newBuilder();
+ expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
+ expectedResult
+ .addOutputFilesBuilder()
+ .setPath("bar")
+ .setDigest(barDigest)
+ .setIsExecutable(true);
+ assertThat(result.build()).isEqualTo(expectedResult.build());
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
index fc195ab3de..c648fae09f 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
@@ -14,12 +14,16 @@
package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.verify;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
+import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
+import com.google.bytestream.ByteStreamProto.WriteRequest;
+import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.ActionInputHelper;
@@ -32,14 +36,6 @@ import com.google.devtools.build.lib.exec.SpawnResult;
import com.google.devtools.build.lib.exec.SpawnRunner.ProgressStatus;
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionPolicy;
import com.google.devtools.build.lib.exec.util.FakeOwner;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
@@ -47,118 +43,154 @@ import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.devtools.common.options.Options;
+import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Command;
+import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
+import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
+import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
+import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
+import com.google.longrunning.Operation;
+import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
+import com.google.rpc.Code;
+import com.google.rpc.Status;
+import io.grpc.Channel;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.protobuf.StatusProto;
+import io.grpc.stub.StreamObserver;
+import io.grpc.util.MutableHandlerRegistry;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.Collection;
+import java.util.Set;
import java.util.SortedMap;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
/** Tests for {@link RemoteSpawnRunner} in combination with {@link GrpcRemoteExecutor}. */
@RunWith(JUnit4.class)
public class GrpcRemoteExecutionClientTest {
- private static final ArtifactExpander SIMPLE_ARTIFACT_EXPANDER = new ArtifactExpander() {
- @Override
- public void expand(Artifact artifact, Collection<? super Artifact> output) {
- output.add(artifact);
- }
- };
+ private static final ArtifactExpander SIMPLE_ARTIFACT_EXPANDER =
+ new ArtifactExpander() {
+ @Override
+ public void expand(Artifact artifact, Collection<? super Artifact> output) {
+ output.add(artifact);
+ }
+ };
+ private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
private FileSystem fs;
private Path execRoot;
private SimpleSpawn simpleSpawn;
private FakeActionInputFileCache fakeFileCache;
-
+ private Digest inputDigest;
+ private RemoteSpawnRunner client;
private FileOutErr outErr;
- private long timeoutMillis = 0;
-
- private final SpawnExecutionPolicy simplePolicy = new SpawnExecutionPolicy() {
- @Override
- public void lockOutputFiles() throws InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ActionInputFileCache getActionInputFileCache() {
- return fakeFileCache;
- }
-
- @Override
- public long getTimeoutMillis() {
- return timeoutMillis;
- }
-
- @Override
- public FileOutErr getFileOutErr() {
- return outErr;
- }
-
- @Override
- public SortedMap<PathFragment, ActionInput> getInputMapping() throws IOException {
- return new SpawnInputExpander(/*strict*/false)
- .getInputMapping(simpleSpawn, SIMPLE_ARTIFACT_EXPANDER, fakeFileCache, "workspace");
- }
-
- @Override
- public void report(ProgressStatus state) {
- // TODO(ulfjack): Test that the right calls are made.
- }
- };
+ private Server fakeServer;
+
+ private final SpawnExecutionPolicy simplePolicy =
+ new SpawnExecutionPolicy() {
+ @Override
+ public void lockOutputFiles() throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ActionInputFileCache getActionInputFileCache() {
+ return fakeFileCache;
+ }
+
+ @Override
+ public long getTimeoutMillis() {
+ return 0;
+ }
+
+ @Override
+ public FileOutErr getFileOutErr() {
+ return outErr;
+ }
+
+ @Override
+ public SortedMap<PathFragment, ActionInput> getInputMapping() throws IOException {
+ return new SpawnInputExpander(/*strict*/ false)
+ .getInputMapping(simpleSpawn, SIMPLE_ARTIFACT_EXPANDER, fakeFileCache, "workspace");
+ }
+
+ @Override
+ public void report(ProgressStatus state) {
+ // TODO(ulfjack): Test that the right calls are made.
+ }
+ };
@Before
public final void setUp() throws Exception {
+ String fakeServerName = "fake server for " + getClass();
+ // Use a mutable service registry for later registering the service impl for each test case.
+ fakeServer =
+ InProcessServerBuilder.forName(fakeServerName)
+ .fallbackHandlerRegistry(serviceRegistry)
+ .directExecutor()
+ .build()
+ .start();
+
+ Chunker.setDefaultChunkSizeForTesting(1000); // Enough for everything to be one chunk.
fs = new InMemoryFileSystem();
execRoot = fs.getPath("/exec/root");
FileSystemUtils.createDirectoryAndParents(execRoot);
fakeFileCache = new FakeActionInputFileCache(execRoot);
- simpleSpawn = new SimpleSpawn(
- new FakeOwner("Mnemonic", "Progress Message"),
- ImmutableList.of("/bin/echo", "Hi!"),
- ImmutableMap.of("VARIABLE", "value"),
- /*executionInfo=*/ImmutableMap.<String, String>of(),
- /*inputs=*/ImmutableList.of(ActionInputHelper.fromPath("input")),
- /*outputs=*/ImmutableList.<ActionInput>of(),
- ResourceSet.ZERO
- );
+ simpleSpawn =
+ new SimpleSpawn(
+ new FakeOwner("Mnemonic", "Progress Message"),
+ ImmutableList.of("/bin/echo", "Hi!"),
+ ImmutableMap.of("VARIABLE", "value"),
+ /*executionInfo=*/ ImmutableMap.<String, String>of(),
+ /*inputs=*/ ImmutableList.of(ActionInputHelper.fromPath("input")),
+ /*outputs=*/ ImmutableList.<ActionInput>of(),
+ ResourceSet.ZERO);
Path stdout = fs.getPath("/tmp/stdout");
Path stderr = fs.getPath("/tmp/stderr");
FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory());
FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory());
outErr = new FileOutErr(stdout, stderr);
+ RemoteOptions options = Options.getDefaults(RemoteOptions.class);
+ Channel channel = InProcessChannelBuilder.forName(fakeServerName).directExecutor().build();
+ GrpcRemoteExecutor executor = new GrpcRemoteExecutor(channel, ChannelOptions.DEFAULT, options);
+ GrpcActionCache remoteCache = new GrpcActionCache(channel, ChannelOptions.DEFAULT, options);
+ client = new RemoteSpawnRunner(execRoot, options, executor, remoteCache);
+ inputDigest = fakeFileCache.createScratchInput(simpleSpawn.getInputFiles().get(0), "xyz");
}
- private void scratch(ActionInput input, String content) throws IOException {
- Path inputFile = execRoot.getRelative(input.getExecPath());
- FileSystemUtils.writeContentAsLatin1(inputFile, content);
- fakeFileCache.setDigest(
- simpleSpawn.getInputFiles().get(0), ByteString.copyFrom(inputFile.getSHA1Digest()));
+ @After
+ public void tearDown() throws Exception {
+ fakeServer.shutdownNow();
}
@Test
public void cacheHit() throws Exception {
- GrpcCasInterface casIface = Mockito.mock(GrpcCasInterface.class);
- GrpcExecutionCacheInterface cacheIface = Mockito.mock(GrpcExecutionCacheInterface.class);
- GrpcExecutionInterface executionIface = Mockito.mock(GrpcExecutionInterface.class);
- RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- GrpcRemoteExecutor executor =
- new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface);
- RemoteSpawnRunner client = new RemoteSpawnRunner(execRoot, options, executor);
-
- scratch(simpleSpawn.getInputFiles().get(0), "xyz");
-
- ExecutionCacheReply reply = ExecutionCacheReply.newBuilder()
- .setStatus(ExecutionCacheStatus.newBuilder().setSucceeded(true))
- .setResult(ActionResult.newBuilder().setReturnCode(0))
- .build();
- when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply);
+ serviceRegistry.addService(
+ new ActionCacheImplBase() {
+ @Override
+ public void getActionResult(
+ GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
+ responseObserver.onNext(ActionResult.getDefaultInstance());
+ responseObserver.onCompleted();
+ }
+ });
SpawnResult result = client.exec(simpleSpawn, simplePolicy);
- verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class));
assertThat(result.setupSuccess()).isTrue();
assertThat(result.exitCode()).isEqualTo(0);
assertThat(outErr.hasRecordedOutput()).isFalse();
@@ -167,69 +199,154 @@ public class GrpcRemoteExecutionClientTest {
@Test
public void cacheHitWithOutput() throws Exception {
- InMemoryCas casIface = new InMemoryCas();
- GrpcExecutionCacheInterface cacheIface = Mockito.mock(GrpcExecutionCacheInterface.class);
- GrpcExecutionInterface executionIface = Mockito.mock(GrpcExecutionInterface.class);
- RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- GrpcRemoteExecutor executor =
- new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface);
- RemoteSpawnRunner client = new RemoteSpawnRunner(execRoot, options, executor);
-
- scratch(simpleSpawn.getInputFiles().get(0), "xyz");
- byte[] cacheStdOut = "stdout".getBytes(StandardCharsets.UTF_8);
- byte[] cacheStdErr = "stderr".getBytes(StandardCharsets.UTF_8);
- ContentDigest stdOutDigest = casIface.put(cacheStdOut);
- ContentDigest stdErrDigest = casIface.put(cacheStdErr);
-
- ExecutionCacheReply reply = ExecutionCacheReply.newBuilder()
- .setStatus(ExecutionCacheStatus.newBuilder().setSucceeded(true))
- .setResult(ActionResult.newBuilder()
- .setReturnCode(0)
- .setStdoutDigest(stdOutDigest)
- .setStderrDigest(stdErrDigest))
- .build();
- when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply);
+ final Digest stdOutDigest = Digests.computeDigestUtf8("stdout");
+ final Digest stdErrDigest = Digests.computeDigestUtf8("stderr");
+ serviceRegistry.addService(
+ new ActionCacheImplBase() {
+ @Override
+ public void getActionResult(
+ GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
+ responseObserver.onNext(
+ ActionResult.newBuilder()
+ .setStdoutDigest(stdOutDigest)
+ .setStderrDigest(stdErrDigest)
+ .build());
+ responseObserver.onCompleted();
+ }
+ });
+ serviceRegistry.addService(
+ new FakeImmutableCacheByteStreamImpl(stdOutDigest, "stdout", stdErrDigest, "stderr"));
+
+ SpawnResult result = client.exec(simpleSpawn, simplePolicy);
+ assertThat(result.setupSuccess()).isTrue();
+ assertThat(result.exitCode()).isEqualTo(0);
+ assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
+ assertThat(outErr.errAsLatin1()).isEqualTo("stderr");
+ }
+
+ @Test
+ public void cacheHitWithInlineOutput() throws Exception {
+ serviceRegistry.addService(
+ new ActionCacheImplBase() {
+ @Override
+ public void getActionResult(
+ GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
+ responseObserver.onNext(
+ ActionResult.newBuilder()
+ .setStdoutRaw(ByteString.copyFromUtf8("stdout"))
+ .setStderrRaw(ByteString.copyFromUtf8("stderr"))
+ .build());
+ responseObserver.onCompleted();
+ }
+ });
SpawnResult result = client.exec(simpleSpawn, simplePolicy);
- verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class));
assertThat(result.setupSuccess()).isTrue();
assertThat(result.exitCode()).isEqualTo(0);
assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
assertThat(outErr.errAsLatin1()).isEqualTo("stderr");
}
+ private Answer<StreamObserver<WriteRequest>> blobWriteAnswer(final byte[] data) {
+ final Digest digest = Digests.computeDigest(data);
+ return new Answer<StreamObserver<WriteRequest>>() {
+ @Override
+ public StreamObserver<WriteRequest> answer(InvocationOnMock invocation) {
+ @SuppressWarnings("unchecked")
+ final StreamObserver<WriteResponse> responseObserver =
+ (StreamObserver<WriteResponse>) invocation.getArguments()[0];
+ return new StreamObserver<WriteRequest>() {
+ @Override
+ public void onNext(WriteRequest request) {
+ assertThat(request.getResourceName()).contains(digest.getHash());
+ assertThat(request.getFinishWrite()).isTrue();
+ assertThat(request.getData().toByteArray()).isEqualTo(data);
+ responseObserver.onNext(
+ WriteResponse.newBuilder().setCommittedSize(request.getData().size()).build());
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ fail("An error occurred: " + t);
+ }
+ };
+ }
+ };
+ }
+
@Test
public void remotelyExecute() throws Exception {
- InMemoryCas casIface = new InMemoryCas();
- GrpcExecutionCacheInterface cacheIface = Mockito.mock(GrpcExecutionCacheInterface.class);
- GrpcExecutionInterface executionIface = Mockito.mock(GrpcExecutionInterface.class);
- RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- GrpcRemoteExecutor executor =
- new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface);
- RemoteSpawnRunner client = new RemoteSpawnRunner(execRoot, options, executor);
-
- scratch(simpleSpawn.getInputFiles().get(0), "xyz");
- byte[] cacheStdOut = "stdout".getBytes(StandardCharsets.UTF_8);
- byte[] cacheStdErr = "stderr".getBytes(StandardCharsets.UTF_8);
- ContentDigest stdOutDigest = casIface.put(cacheStdOut);
- ContentDigest stdErrDigest = casIface.put(cacheStdErr);
-
- ExecutionCacheReply reply = ExecutionCacheReply.newBuilder()
- .setStatus(ExecutionCacheStatus.newBuilder().setSucceeded(true))
- .build();
- when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply);
-
- when(executionIface.execute(any(ExecuteRequest.class))).thenReturn(ImmutableList.of(
- ExecuteReply.newBuilder()
- .setStatus(ExecutionStatus.newBuilder().setSucceeded(true))
- .setResult(ActionResult.newBuilder()
- .setReturnCode(0)
- .setStdoutDigest(stdOutDigest)
- .setStderrDigest(stdErrDigest))
- .build()).iterator());
+ // getActionResult mock returns null by default, meaning a cache miss.
+ serviceRegistry.addService(
+ new ActionCacheImplBase() {
+ @Override
+ public void getActionResult(
+ GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
+ responseObserver.onError(
+ StatusProto.toStatusRuntimeException(
+ Status.newBuilder().setCode(Code.NOT_FOUND.getNumber()).build()));
+ }
+ });
+ final ActionResult actionResult =
+ ActionResult.newBuilder()
+ .setStdoutRaw(ByteString.copyFromUtf8("stdout"))
+ .setStderrRaw(ByteString.copyFromUtf8("stderr"))
+ .build();
+ serviceRegistry.addService(
+ new ExecutionImplBase() {
+ @Override
+ public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
+ responseObserver.onNext(
+ Operation.newBuilder()
+ .setDone(true)
+ .setResponse(
+ Any.pack(ExecuteResponse.newBuilder().setResult(actionResult).build()))
+ .build());
+ responseObserver.onCompleted();
+ }
+ });
+ final Command command =
+ Command.newBuilder()
+ .addAllArguments(ImmutableList.of("/bin/echo", "Hi!"))
+ .addEnvironmentVariables(
+ Command.EnvironmentVariable.newBuilder()
+ .setName("VARIABLE")
+ .setValue("value")
+ .build())
+ .build();
+ final Digest cmdDigest = Digests.computeDigest(command);
+ serviceRegistry.addService(
+ new ContentAddressableStorageImplBase() {
+ @Override
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ FindMissingBlobsResponse.Builder b = FindMissingBlobsResponse.newBuilder();
+ final Set<Digest> requested = ImmutableSet.copyOf(request.getBlobDigestsList());
+ if (requested.contains(cmdDigest)) {
+ b.addMissingBlobDigests(cmdDigest);
+ } else if (requested.contains(inputDigest)) {
+ b.addMissingBlobDigests(inputDigest);
+ } else {
+ fail("Unexpected call to findMissingBlobs: " + request);
+ }
+ responseObserver.onNext(b.build());
+ responseObserver.onCompleted();
+ }
+ });
+
+ ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class);
+ when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject()))
+ .thenAnswer(blobWriteAnswer(command.toByteArray()))
+ .thenAnswer(blobWriteAnswer("xyz".getBytes(UTF_8)));
+ serviceRegistry.addService(mockByteStreamImpl);
SpawnResult result = client.exec(simpleSpawn, simplePolicy);
- verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class));
assertThat(result.setupSuccess()).isTrue();
assertThat(result.exitCode()).isEqualTo(0);
assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
diff --git a/src/test/java/com/google/devtools/build/lib/remote/InMemoryCas.java b/src/test/java/com/google/devtools/build/lib/remote/InMemoryCas.java
deleted file mode 100644
index cba62a536f..0000000000
--- a/src/test/java/com/google/devtools/build/lib/remote/InMemoryCas.java
+++ /dev/null
@@ -1,143 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote;
-
-import com.google.common.base.Preconditions;
-import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.protobuf.ByteString;
-import io.grpc.stub.StreamObserver;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/** An in-memory implementation of GrpcCasInterface. */
-final class InMemoryCas implements GrpcCasInterface {
- private final Map<ByteString, ByteString> content = new HashMap<>();
-
- public ContentDigest put(byte[] data) {
- ContentDigest digest = ContentDigests.computeDigest(data);
- ByteString key = digest.getDigest();
- ByteString value = ByteString.copyFrom(data);
- content.put(key, value);
- return digest;
- }
-
- @Override
- public CasLookupReply lookup(CasLookupRequest request) {
- CasStatus.Builder result = CasStatus.newBuilder();
- for (ContentDigest digest : request.getDigestList()) {
- ByteString key = digest.getDigest();
- if (!content.containsKey(key)) {
- result.addMissingDigest(digest);
- }
- }
- if (result.getMissingDigestCount() != 0) {
- result.setError(CasStatus.ErrorCode.MISSING_DIGEST);
- } else {
- result.setSucceeded(true);
- }
- return CasLookupReply.newBuilder().setStatus(result).build();
- }
-
- @Override
- public CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request) {
- return CasUploadTreeMetadataReply.newBuilder()
- .setStatus(CasStatus.newBuilder().setSucceeded(true))
- .build();
- }
-
- @Override
- public CasDownloadTreeMetadataReply downloadTreeMetadata(
- CasDownloadTreeMetadataRequest request) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request) {
- List<CasDownloadReply> result = new ArrayList<>();
- for (ContentDigest digest : request.getDigestList()) {
- CasDownloadReply.Builder builder = CasDownloadReply.newBuilder();
- ByteString item = content.get(digest.getDigest());
- if (item != null) {
- builder.setStatus(CasStatus.newBuilder().setSucceeded(true));
- builder.setData(BlobChunk.newBuilder().setData(item).setDigest(digest));
- } else {
- throw new IllegalStateException();
- }
- result.add(builder.build());
- }
- return result.iterator();
- }
-
- @Override
- public StreamObserver<CasUploadBlobRequest> uploadBlobAsync(
- final StreamObserver<CasUploadBlobReply> responseObserver) {
- return new StreamObserver<CasUploadBlobRequest>() {
- private ContentDigest digest;
- private ByteArrayOutputStream current;
-
- @Override
- public void onNext(CasUploadBlobRequest value) {
- BlobChunk chunk = value.getData();
- if (chunk.hasDigest()) {
- Preconditions.checkState(digest == null);
- digest = chunk.getDigest();
- current = new ByteArrayOutputStream();
- }
- try {
- current.write(chunk.getData().toByteArray());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- responseObserver.onNext(
- CasUploadBlobReply.newBuilder()
- .setStatus(CasStatus.newBuilder().setSucceeded(true))
- .build());
- }
-
- @Override
- public void onError(Throwable t) {
- throw new RuntimeException(t);
- }
-
- @Override
- public void onCompleted() {
- ContentDigest check = ContentDigests.computeDigest(current.toByteArray());
- Preconditions.checkState(check.equals(digest), "%s != %s", digest, check);
- ByteString key = digest.getDigest();
- ByteString value = ByteString.copyFrom(current.toByteArray());
- digest = null;
- current = null;
- content.put(key, value);
- responseObserver.onCompleted();
- }
- };
- }
-} \ No newline at end of file
diff --git a/src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java b/src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java
index f9ef9111a6..94374f6364 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java
@@ -22,14 +22,14 @@ import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.Root;
import com.google.devtools.build.lib.exec.SingleBuildFileCache;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.testutil.Scratch;
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.FileSystem.HashFunction;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.Directory;
import java.util.ArrayList;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -54,8 +54,8 @@ public class TreeNodeRepositoryTest {
}
private TreeNodeRepository createTestTreeNodeRepository() {
- ActionInputFileCache inputFileCache = new SingleBuildFileCache(
- rootPath.getPathString(), scratch.getFileSystem());
+ ActionInputFileCache inputFileCache =
+ new SingleBuildFileCache(rootPath.getPathString(), scratch.getFileSystem());
return new TreeNodeRepository(rootPath, inputFileCache);
}
@@ -86,34 +86,26 @@ public class TreeNodeRepositoryTest {
TreeNode barNode = aNode.getChildEntries().get(0).getChild();
repo.computeMerkleDigests(root);
- ImmutableCollection<ContentDigest> digests = repo.getAllDigests(root);
- ContentDigest rootDigest = repo.getMerkleDigest(root);
- ContentDigest aDigest = repo.getMerkleDigest(aNode);
- ContentDigest fooDigest = repo.getMerkleDigest(fooNode);
- ContentDigest fooContentsDigest = ContentDigests.computeDigest(foo.getPath());
- ContentDigest barDigest = repo.getMerkleDigest(barNode);
- ContentDigest barContentsDigest = ContentDigests.computeDigest(bar.getPath());
- assertThat(digests)
- .containsExactly(
- rootDigest, aDigest, barDigest, barContentsDigest, fooDigest, fooContentsDigest);
+ ImmutableCollection<Digest> digests = repo.getAllDigests(root);
+ Digest rootDigest = repo.getMerkleDigest(root);
+ Digest aDigest = repo.getMerkleDigest(aNode);
+ Digest fooDigest = repo.getMerkleDigest(fooNode); // The contents digest.
+ Digest barDigest = repo.getMerkleDigest(barNode);
+ assertThat(digests).containsExactly(rootDigest, aDigest, barDigest, fooDigest);
- ArrayList<FileNode> fileNodes = new ArrayList<>();
+ ArrayList<Directory> directories = new ArrayList<>();
ArrayList<ActionInput> actionInputs = new ArrayList<>();
- repo.getDataFromDigests(digests, actionInputs, fileNodes);
+ repo.getDataFromDigests(digests, actionInputs, directories);
assertThat(actionInputs).containsExactly(bar, foo);
- assertThat(fileNodes).hasSize(4);
- FileNode rootFileNode = fileNodes.get(0);
- assertThat(rootFileNode.getChild(0).getPath()).isEqualTo("a");
- assertThat(rootFileNode.getChild(0).getDigest()).isEqualTo(aDigest);
- FileNode aFileNode = fileNodes.get(1);
- assertThat(aFileNode.getChild(0).getPath()).isEqualTo("bar");
- assertThat(aFileNode.getChild(0).getDigest()).isEqualTo(barDigest);
- assertThat(aFileNode.getChild(1).getPath()).isEqualTo("foo");
- assertThat(aFileNode.getChild(1).getDigest()).isEqualTo(fooDigest);
- FileNode barFileNode = fileNodes.get(2);
- assertThat(barFileNode.getFileMetadata().getDigest()).isEqualTo(barContentsDigest);
- FileNode fooFileNode = fileNodes.get(3);
- assertThat(fooFileNode.getFileMetadata().getDigest()).isEqualTo(fooContentsDigest);
+ assertThat(directories).hasSize(2);
+ Directory rootDirectory = directories.get(0);
+ assertThat(rootDirectory.getDirectories(0).getName()).isEqualTo("a");
+ assertThat(rootDirectory.getDirectories(0).getDigest()).isEqualTo(aDigest);
+ Directory aDirectory = directories.get(1);
+ assertThat(aDirectory.getFiles(0).getName()).isEqualTo("bar");
+ assertThat(aDirectory.getFiles(0).getDigest()).isEqualTo(barDigest);
+ assertThat(aDirectory.getFiles(1).getName()).isEqualTo("foo");
+ assertThat(aDirectory.getFiles(1).getDigest()).isEqualTo(fooDigest);
}
@Test
@@ -124,8 +116,8 @@ public class TreeNodeRepositoryTest {
TreeNodeRepository repo = createTestTreeNodeRepository();
TreeNode root = repo.buildFromActionInputs(ImmutableList.<ActionInput>of(foo1, foo2, foo3));
repo.computeMerkleDigests(root);
- // Reusing same node for the "foo" subtree: only need the root, root child, foo, and contents:
- assertThat(repo.getAllDigests(root)).hasSize(4);
+ // Reusing same node for the "foo" subtree: only need the root, root child, and foo contents:
+ assertThat(repo.getAllDigests(root)).hasSize(3);
}
@Test
@@ -141,16 +133,12 @@ public class TreeNodeRepositoryTest {
TreeNode aNode = root.getChildEntries().get(0).getChild();
TreeNode fooNode = aNode.getChildEntries().get(1).getChild(); // foo > bar in sort order!
TreeNode barNode = aNode.getChildEntries().get(0).getChild();
- ImmutableCollection<ContentDigest> digests = repo.getAllDigests(root);
- ContentDigest rootDigest = repo.getMerkleDigest(root);
- ContentDigest aDigest = repo.getMerkleDigest(aNode);
- ContentDigest fooDigest = repo.getMerkleDigest(fooNode);
- ContentDigest fooContentsDigest = ContentDigests.computeDigest(foo.getPath());
- ContentDigest barDigest = repo.getMerkleDigest(barNode);
- ContentDigest barContentsDigest = ContentDigests.computeDigest(new byte[0]);
- assertThat(digests)
- .containsExactly(
- rootDigest, aDigest, barDigest, barContentsDigest, fooDigest, fooContentsDigest);
+ ImmutableCollection<Digest> digests = repo.getAllDigests(root);
+ Digest rootDigest = repo.getMerkleDigest(root);
+ Digest aDigest = repo.getMerkleDigest(aNode);
+ Digest fooDigest = repo.getMerkleDigest(fooNode);
+ Digest barDigest = repo.getMerkleDigest(barNode);
+ assertThat(digests).containsExactly(rootDigest, aDigest, barDigest, fooDigest);
}
@Test
diff --git a/src/test/shell/bazel/remote_execution_test.sh b/src/test/shell/bazel/remote_execution_test.sh
index 6ed3101589..37515bce29 100755
--- a/src/test/shell/bazel/remote_execution_test.sh
+++ b/src/test/shell/bazel/remote_execution_test.sh
@@ -30,7 +30,6 @@ function set_up() {
${bazel_data}/src/tools/remote_worker/remote_worker \
--work_path=${work_path} \
--listen_port=${worker_port} \
- --grpc_max_chunk_size_bytes=120000000 \
--hazelcast_standalone_listen_port=${hazelcast_port} \
--pid_file=${pid_file} >& $TEST_log &
local wait_seconds=0
@@ -187,7 +186,6 @@ EOF
bazel --host_jvm_args=-Dbazel.DigestFunction=SHA1 build \
--spawn_strategy=remote \
--noremote_local_fallback \
- --grpc_max_chunk_size_bytes=120000000 \
--remote_executor=localhost:${worker_port} \
--remote_cache=localhost:${worker_port} \
//a:large_output >& $TEST_log \
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD
index a91e5e5d73..ddbb5ede52 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD
@@ -25,13 +25,19 @@ java_library(
"//src/main/java/com/google/devtools/build/lib:vfs",
"//src/main/java/com/google/devtools/build/lib/remote",
"//src/main/java/com/google/devtools/common/options",
- "//src/main/protobuf:remote_protocol_java_grpc",
- "//src/main/protobuf:remote_protocol_java_proto",
"//third_party:guava",
"//third_party:hazelcast",
"//third_party:netty",
"//third_party/grpc:grpc-jar",
"//third_party/protobuf:protobuf_java",
"//third_party/protobuf:protobuf_java_util",
+ "@googleapis//:google_bytestream_bytestream_java_grpc",
+ "@googleapis//:google_bytestream_bytestream_java_proto",
+ "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc",
+ "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto",
+ "@googleapis//:google_longrunning_operations_java_proto",
+ "@googleapis//:google_rpc_code_java_proto",
+ "@googleapis//:google_rpc_error_details_java_proto",
+ "@googleapis//:google_rpc_status_java_proto",
],
)
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
index 8558be720f..0eee6695e1 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
@@ -14,43 +14,16 @@
package com.google.devtools.build.remote;
-import com.google.common.collect.ImmutableList;
-import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
-import com.google.devtools.build.lib.exec.SpawnResult.Status;
+import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
+import com.google.bytestream.ByteStreamProto.ReadRequest;
+import com.google.bytestream.ByteStreamProto.ReadResponse;
+import com.google.bytestream.ByteStreamProto.WriteRequest;
+import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.devtools.build.lib.remote.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceImplBase;
-import com.google.devtools.build.lib.remote.ChannelOptions;
import com.google.devtools.build.lib.remote.Chunker;
-import com.google.devtools.build.lib.remote.ContentDigests;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceImplBase;
-import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceImplBase;
+import com.google.devtools.build.lib.remote.Digests;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.RemoteOptions;
-import com.google.devtools.build.lib.remote.RemoteProtocol;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Command.EnvironmentEntry;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Platform;
import com.google.devtools.build.lib.remote.SimpleBlobStore;
import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
import com.google.devtools.build.lib.remote.SimpleBlobStoreFactory;
@@ -61,23 +34,48 @@ import com.google.devtools.build.lib.shell.CommandResult;
import com.google.devtools.build.lib.shell.TimeoutKillableObserver;
import com.google.devtools.build.lib.unix.UnixFileSystem;
import com.google.devtools.build.lib.util.OS;
-import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.util.ProcessUtils;
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.JavaIoFileSystem;
import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.common.options.Options;
import com.google.devtools.common.options.OptionsParser;
+import com.google.devtools.remoteexecution.v1test.Action;
+import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsRequest;
+import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsResponse;
+import com.google.devtools.remoteexecution.v1test.Command.EnvironmentVariable;
+import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
+import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
+import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
+import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
+import com.google.devtools.remoteexecution.v1test.Platform;
+import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest;
+import com.google.devtools.remoteexecution.v1test.UpdateBlobRequest;
+import com.google.longrunning.Operation;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
+import com.google.rpc.BadRequest;
+import com.google.rpc.BadRequest.FieldViolation;
+import com.google.rpc.Code;
+import com.google.rpc.Status;
import io.grpc.Server;
+import io.grpc.StatusRuntimeException;
import io.grpc.netty.NettyServerBuilder;
+import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
+import java.io.StringWriter;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -88,6 +86,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.annotation.Nullable;
/**
* Implements a remote worker that accepts work items as protobufs. The server implementation is
@@ -96,13 +95,10 @@ import java.util.logging.Logger;
public class RemoteWorker {
private static final Logger LOG = Logger.getLogger(RemoteWorker.class.getName());
private static final boolean LOG_FINER = LOG.isLoggable(Level.FINER);
-
- private static final int LOCAL_EXEC_ERROR = -1;
- private static final int SIGALRM_EXIT_CODE = /*SIGNAL_BASE=*/128 + /*SIGALRM=*/14;
-
- private final CasServiceImplBase casServer;
- private final ExecuteServiceImplBase execServer;
- private final ExecutionCacheServiceImplBase execCacheServer;
+ private final ContentAddressableStorageImplBase casServer;
+ private final ByteStreamImplBase bsServer;
+ private final ExecutionImplBase execServer;
+ private final ActionCacheImplBase actionCacheServer;
private final SimpleBlobStoreActionCache cache;
private final RemoteWorkerOptions workerOptions;
private final RemoteOptions remoteOptions;
@@ -123,16 +119,16 @@ public class RemoteWorker {
execServer = null;
}
casServer = new CasServer();
- execCacheServer = new ExecutionCacheServer();
+ bsServer = new ByteStreamServer();
+ actionCacheServer = new ActionCacheServer();
}
public Server startServer() throws IOException {
NettyServerBuilder b =
NettyServerBuilder.forPort(workerOptions.listenPort)
- .maxMessageSize(ChannelOptions.create(Options.getDefaults(AuthAndTLSOptions.class),
- remoteOptions.grpcMaxChunkSizeBytes).maxMessageSize())
.addService(casServer)
- .addService(execCacheServer);
+ .addService(bsServer)
+ .addService(actionCacheServer);
if (execServer != null) {
b.addService(execServer);
} else {
@@ -147,215 +143,258 @@ public class RemoteWorker {
return server;
}
- class CasServer extends CasServiceImplBase {
- private static final int MAX_MEMORY_KBYTES = 512 * 1024;
+ private static @Nullable Digest parseDigestFromResourceName(String resourceName) {
+ try {
+ String[] tokens = resourceName.split("/");
+ if (tokens.length < 2) {
+ return null;
+ }
+ String hash = tokens[tokens.length - 2];
+ long size = Long.parseLong(tokens[tokens.length - 1]);
+ return Digests.buildDigest(hash, size);
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+
+ private static StatusRuntimeException internalError(Exception e) {
+ return StatusProto.toStatusRuntimeException(internalErrorStatus(e));
+ }
+
+ private static com.google.rpc.Status internalErrorStatus(Exception e) {
+ return Status.newBuilder()
+ .setCode(Code.INTERNAL.getNumber())
+ .setMessage("Internal error: " + e)
+ .build();
+ }
+
+ private static StatusRuntimeException notFoundError(Digest digest) {
+ return StatusProto.toStatusRuntimeException(notFoundStatus(digest));
+ }
+
+ private static com.google.rpc.Status notFoundStatus(Digest digest) {
+ return Status.newBuilder()
+ .setCode(Code.NOT_FOUND.getNumber())
+ .setMessage("Digest not found:" + digest)
+ .build();
+ }
+
+ private static StatusRuntimeException invalidArgumentError(String field, String desc) {
+ return StatusProto.toStatusRuntimeException(invalidArgumentStatus(field, desc));
+ }
+
+ private static com.google.rpc.Status invalidArgumentStatus(String field, String desc) {
+ FieldViolation v = FieldViolation.newBuilder().setField(field).setDescription(desc).build();
+ return Status.newBuilder()
+ .setCode(Code.INVALID_ARGUMENT.getNumber())
+ .setMessage("invalid argument(s): " + field + ": " + desc)
+ .addDetails(Any.pack(BadRequest.newBuilder().addFieldViolations(v).build()))
+ .build();
+ }
+ class CasServer extends ContentAddressableStorageImplBase {
@Override
- public void lookup(CasLookupRequest request, StreamObserver<CasLookupReply> responseObserver) {
- CasLookupReply.Builder reply = CasLookupReply.newBuilder();
- CasStatus.Builder status = reply.getStatusBuilder();
- for (ContentDigest digest : request.getDigestList()) {
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ FindMissingBlobsResponse.Builder response = FindMissingBlobsResponse.newBuilder();
+ for (Digest digest : request.getBlobDigestsList()) {
if (!cache.containsKey(digest)) {
- status.addMissingDigest(digest);
+ response.addMissingBlobDigests(digest);
}
}
- if (status.getMissingDigestCount() > 0) {
- status.setSucceeded(false);
- status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
- } else {
- status.setSucceeded(true);
- }
- responseObserver.onNext(reply.build());
+ responseObserver.onNext(response.build());
responseObserver.onCompleted();
}
@Override
- public void uploadTreeMetadata(
- CasUploadTreeMetadataRequest request,
- StreamObserver<CasUploadTreeMetadataReply> responseObserver) {
- try {
- for (FileNode treeNode : request.getTreeNodeList()) {
- cache.uploadBlob(treeNode.toByteArray());
+ public void batchUpdateBlobs(
+ BatchUpdateBlobsRequest request,
+ StreamObserver<BatchUpdateBlobsResponse> responseObserver) {
+ BatchUpdateBlobsResponse.Builder batchResponse = BatchUpdateBlobsResponse.newBuilder();
+ for (UpdateBlobRequest r : request.getRequestsList()) {
+ BatchUpdateBlobsResponse.Response.Builder resp = batchResponse.addResponsesBuilder();
+ try {
+ Digest digest = cache.uploadBlob(r.getData().toByteArray());
+ if (!r.getContentDigest().equals(digest)) {
+ String err =
+ "Upload digest " + r.getContentDigest() + " did not match data digest: " + digest;
+ resp.setStatus(invalidArgumentStatus("content_digest", err));
+ continue;
+ }
+ resp.getStatusBuilder().setCode(Code.OK.getNumber());
+ } catch (Exception e) {
+ resp.setStatus(internalErrorStatus(e));
}
- responseObserver.onNext(
- CasUploadTreeMetadataReply.newBuilder()
- .setStatus(CasStatus.newBuilder().setSucceeded(true))
- .build());
- } catch (Exception e) {
- LOG.warning("Request failed: " + e.toString());
- CasUploadTreeMetadataReply.Builder reply = CasUploadTreeMetadataReply.newBuilder();
- reply
- .getStatusBuilder()
- .setSucceeded(false)
- .setError(CasStatus.ErrorCode.UNKNOWN)
- .setErrorDetail(e.toString());
- responseObserver.onNext(reply.build());
- } finally {
- responseObserver.onCompleted();
}
+ responseObserver.onNext(batchResponse.build());
+ responseObserver.onCompleted();
}
+ }
+ class ByteStreamServer extends ByteStreamImplBase {
@Override
- public void downloadBlob(
- CasDownloadBlobRequest request, StreamObserver<CasDownloadReply> responseObserver) {
- CasDownloadReply.Builder reply = CasDownloadReply.newBuilder();
- CasStatus.Builder status = reply.getStatusBuilder();
- for (ContentDigest digest : request.getDigestList()) {
- if (!cache.containsKey(digest)) {
- status.addMissingDigest(digest);
- }
+ public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+ Digest digest = parseDigestFromResourceName(request.getResourceName());
+ if (digest == null) {
+ responseObserver.onError(
+ invalidArgumentError(
+ "resource_name",
+ "Failed parsing digest from resource_name:" + request.getResourceName()));
}
- if (status.getMissingDigestCount() > 0) {
- status.setSucceeded(false);
- status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
- responseObserver.onNext(reply.build());
- responseObserver.onCompleted();
+ if (!cache.containsKey(digest)) {
+ responseObserver.onError(notFoundError(digest));
return;
}
- status.setSucceeded(true);
try {
- // This still relies on the total blob size to be small enough to fit in memory
- // simultaneously! TODO(olaola): refactor to fix this if the need arises.
- Chunker.Builder b = new Chunker.Builder().chunkSize(remoteOptions.grpcMaxChunkSizeBytes);
- for (ContentDigest digest : request.getDigestList()) {
- b.addInput(cache.downloadBlob(digest));
- }
- Chunker c = b.build();
+ // This still relies on the blob size to be small enough to fit in memory.
+ // TODO(olaola): refactor to fix this if the need arises.
+ Chunker c = Chunker.from(cache.downloadBlob(digest));
while (c.hasNext()) {
- reply.setData(c.next());
- responseObserver.onNext(reply.build());
- if (reply.hasStatus()) {
- reply.clearStatus(); // Only send status on first chunk.
- }
+ responseObserver.onNext(
+ ReadResponse.newBuilder().setData(ByteString.copyFrom(c.next().getData())).build());
}
- } catch (IOException e) {
- // This cannot happen, as we are chunking in-memory blobs.
- throw new RuntimeException("Internal error: " + e);
+ responseObserver.onCompleted();
} catch (CacheNotFoundException e) {
// This can only happen if an item gets evicted right after we check.
- reply.clearData();
- status.setSucceeded(false);
- status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
- status.addMissingDigest(e.getMissingDigest());
- responseObserver.onNext(reply.build());
- } finally {
- responseObserver.onCompleted();
+ responseObserver.onError(notFoundError(digest));
+ } catch (Exception e) {
+ LOG.warning("Read request failed: " + e);
+ responseObserver.onError(internalError(e));
}
}
@Override
- public StreamObserver<CasUploadBlobRequest> uploadBlob(
- final StreamObserver<CasUploadBlobReply> responseObserver) {
- return new StreamObserver<CasUploadBlobRequest>() {
+ public StreamObserver<WriteRequest> write(
+ final StreamObserver<WriteResponse> responseObserver) {
+ return new StreamObserver<WriteRequest>() {
byte[] blob = null;
- ContentDigest digest = null;
+ Digest digest = null;
long offset = 0;
+ String resourceName = null;
+ boolean closed = false;
@Override
- public void onNext(CasUploadBlobRequest request) {
- BlobChunk chunk = request.getData();
- try {
- if (chunk.hasDigest()) {
- // Check if the previous chunk was really done.
- Preconditions.checkArgument(
- digest == null || offset == 0,
- "Missing input chunk for digest %s",
- digest == null ? "" : ContentDigests.toString(digest));
- digest = chunk.getDigest();
- // This unconditionally downloads the whole blob into memory!
- Preconditions.checkArgument((int) (digest.getSizeBytes() / 1024) < MAX_MEMORY_KBYTES);
- blob = new byte[(int) digest.getSizeBytes()];
- }
- Preconditions.checkArgument(digest != null, "First chunk contains no digest");
- Preconditions.checkArgument(
- offset == chunk.getOffset(),
- "Missing input chunk for digest %s",
- ContentDigests.toString(digest));
- if (digest.getSizeBytes() > 0) {
- chunk.getData().copyTo(blob, (int) offset);
- offset = (offset + chunk.getData().size()) % digest.getSizeBytes();
- }
- if (offset == 0) {
- ContentDigest uploadedDigest = cache.uploadBlob(blob);
- Preconditions.checkArgument(
- uploadedDigest.equals(digest),
- "Digest mismatch: client sent %s, server computed %s",
- ContentDigests.toString(digest),
- ContentDigests.toString(uploadedDigest));
- }
- } catch (Exception e) {
- LOG.warning("Request failed: " + e.toString());
- CasUploadBlobReply.Builder reply = CasUploadBlobReply.newBuilder();
- reply
- .getStatusBuilder()
- .setSucceeded(false)
- .setError(
- e instanceof IllegalArgumentException
- ? CasStatus.ErrorCode.INVALID_ARGUMENT
- : CasStatus.ErrorCode.UNKNOWN)
- .setErrorDetail(e.toString());
- responseObserver.onNext(reply.build());
+ public void onNext(WriteRequest request) {
+ if (closed) {
+ return;
+ }
+ if (digest == null) {
+ resourceName = request.getResourceName();
+ digest = parseDigestFromResourceName(resourceName);
+ blob = new byte[(int) digest.getSizeBytes()];
+ }
+ if (digest == null) {
+ responseObserver.onError(
+ invalidArgumentError(
+ "resource_name",
+ "Failed parsing digest from resource_name:" + request.getResourceName()));
+ closed = true;
+ return;
+ }
+ if (request.getWriteOffset() != offset) {
+ responseObserver.onError(
+ invalidArgumentError(
+ "write_offset",
+ "Expected:" + offset + ", received: " + request.getWriteOffset()));
+ closed = true;
+ return;
+ }
+ if (!request.getResourceName().isEmpty()
+ && !request.getResourceName().equals(resourceName)) {
+ responseObserver.onError(
+ invalidArgumentError(
+ "resource_name",
+ "Expected:" + resourceName + ", received: " + request.getResourceName()));
+ closed = true;
+ return;
+ }
+ long size = request.getData().size();
+ if (size > 0) {
+ request.getData().copyTo(blob, (int) offset);
+ offset += size;
+ }
+ boolean shouldFinishWrite = offset == digest.getSizeBytes();
+ if (shouldFinishWrite != request.getFinishWrite()) {
+ responseObserver.onError(
+ invalidArgumentError(
+ "finish_write",
+ "Expected:" + shouldFinishWrite + ", received: " + request.getFinishWrite()));
+ closed = true;
}
}
@Override
public void onError(Throwable t) {
- LOG.warning("Request errored remotely: " + t);
+ LOG.warning("Write request errored remotely: " + t);
+ closed = true;
}
@Override
public void onCompleted() {
- responseObserver.onCompleted();
+ if (closed) {
+ return;
+ }
+ if (digest == null || offset != digest.getSizeBytes()) {
+ responseObserver.onError(
+ StatusProto.toStatusRuntimeException(
+ Status.newBuilder()
+ .setCode(Code.FAILED_PRECONDITION.getNumber())
+ .setMessage("Request completed before all data was sent.")
+ .build()));
+ closed = true;
+ return;
+ }
+ try {
+ Digest d = cache.uploadBlob(blob);
+ if (!d.equals(digest)) {
+ String err = "Received digest " + digest + " does not match computed digest " + d;
+ responseObserver.onError(invalidArgumentError("resource_name", err));
+ closed = true;
+ return;
+ }
+ responseObserver.onNext(WriteResponse.newBuilder().setCommittedSize(offset).build());
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ LOG.warning("Write request failed: " + e);
+ responseObserver.onError(internalError(e));
+ closed = true;
+ }
}
};
}
}
- class ExecutionCacheServer extends ExecutionCacheServiceImplBase {
+ class ActionCacheServer extends ActionCacheImplBase {
@Override
- public void getCachedResult(
- ExecutionCacheRequest request, StreamObserver<ExecutionCacheReply> responseObserver) {
+ public void getActionResult(
+ GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
try {
- ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest());
- ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder();
+ ActionKey actionKey = Digests.unsafeActionKeyFromDigest(request.getActionDigest());
ActionResult result = cache.getCachedActionResult(actionKey);
- if (result != null) {
- reply.setResult(result);
+ if (result == null) {
+ responseObserver.onError(notFoundError(request.getActionDigest()));
+ return;
}
- reply.getStatusBuilder().setSucceeded(true);
- responseObserver.onNext(reply.build());
- } catch (Exception e) {
- LOG.warning("getCachedActionResult request failed: " + e.toString());
- ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder();
- reply
- .getStatusBuilder()
- .setSucceeded(false)
- .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN);
- responseObserver.onNext(reply.build());
- } finally {
+ responseObserver.onNext(result);
responseObserver.onCompleted();
+ } catch (Exception e) {
+ LOG.warning("getActionResult request failed: " + e);
+ responseObserver.onError(internalError(e));
}
}
@Override
- public void setCachedResult(
- ExecutionCacheSetRequest request, StreamObserver<ExecutionCacheSetReply> responseObserver) {
+ public void updateActionResult(
+ UpdateActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
try {
- ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest());
- cache.setCachedActionResult(actionKey, request.getResult());
- ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder();
- reply.getStatusBuilder().setSucceeded(true);
- responseObserver.onNext(reply.build());
- } catch (Exception e) {
- LOG.warning("setCachedActionResult request failed: " + e.toString());
- ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder();
- reply
- .getStatusBuilder()
- .setSucceeded(false)
- .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN);
- responseObserver.onNext(reply.build());
- } finally {
+ ActionKey actionKey = Digests.unsafeActionKeyFromDigest(request.getActionDigest());
+ cache.setCachedActionResult(actionKey, request.getActionResult());
+ responseObserver.onNext(request.getActionResult());
responseObserver.onCompleted();
+ } catch (Exception e) {
+ LOG.warning("updateActionResult request failed: " + e);
+ responseObserver.onError(internalError(e));
}
}
}
@@ -363,23 +402,26 @@ public class RemoteWorker {
// How long to wait for the uid command.
private static final Duration uidTimeout = Durations.fromMicros(30);
- class ExecutionServer extends ExecuteServiceImplBase {
+ class ExecutionServer extends ExecutionImplBase {
private final Path workPath;
//The name of the container image entry in the Platform proto
- // (see src/main/protobuf/remote_protocol.proto and
+ // (see third_party/googleapis/devtools/remoteexecution/*/remote_execution.proto and
// experimental_remote_platform_override in
// src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java)
public static final String CONTAINER_IMAGE_ENTRY_NAME = "container-image";
+ private static final int LOCAL_EXEC_ERROR = -1;
+
public ExecutionServer(Path workPath) {
this.workPath = workPath;
}
- private Map<String, String> getEnvironmentVariables(RemoteProtocol.Command command) {
+ private Map<String, String> getEnvironmentVariables(
+ com.google.devtools.remoteexecution.v1test.Command command) {
HashMap<String, String> result = new HashMap<>();
- for (EnvironmentEntry entry : command.getEnvironmentList()) {
- result.put(entry.getVariable(), entry.getValue());
+ for (EnvironmentVariable v : command.getEnvironmentVariablesList()) {
+ result.put(v.getName(), v.getValue());
}
return result;
}
@@ -412,17 +454,14 @@ public class RemoteWorker {
// null. Otherwise returns docker container name from the parameters.
private String dockerContainer(Action action) throws IllegalArgumentException {
String result = null;
- List<Platform.Property> entries = action.getPlatform().getEntryList();
-
- for (Platform.Property entry : entries) {
- if (entry.getName().equals(CONTAINER_IMAGE_ENTRY_NAME)) {
- if (result == null) {
- result = entry.getValue();
- } else {
+ for (Platform.Property property : action.getPlatform().getPropertiesList()) {
+ if (property.getName().equals(CONTAINER_IMAGE_ENTRY_NAME)) {
+ if (result != null) {
// Multiple container name entries
throw new IllegalArgumentException(
"Multiple entries for " + CONTAINER_IMAGE_ENTRY_NAME + " in action.Platform");
}
+ result = property.getValue();
}
}
return result;
@@ -479,38 +518,34 @@ public class RemoteWorker {
new File(pathString));
}
- public ExecuteReply execute(Action action, Path execRoot)
- throws IOException, InterruptedException, IllegalArgumentException {
- ByteArrayOutputStream stdout = new ByteArrayOutputStream();
- ByteArrayOutputStream stderr = new ByteArrayOutputStream();
- RemoteProtocol.Command command;
- try {
- command =
- RemoteProtocol.Command.parseFrom(cache.downloadBlob(action.getCommandDigest()));
- cache.downloadTree(action.getInputRootDigest(), execRoot);
- } catch (CacheNotFoundException e) {
- LOG.warning("Cache miss on " + ContentDigests.toString(e.getMissingDigest()));
- return ExecuteReply.newBuilder()
- .setCasError(
- CasStatus.newBuilder()
- .setSucceeded(false)
- .addMissingDigest(e.getMissingDigest())
- .setError(CasStatus.ErrorCode.MISSING_DIGEST)
- .setErrorDetail(e.toString()))
- .setStatus(
- ExecutionStatus.newBuilder()
- .setExecuted(false)
- .setSucceeded(false)
- .setError(
- e.getMissingDigest() == action.getCommandDigest()
- ? ExecutionStatus.ErrorCode.MISSING_COMMAND
- : ExecutionStatus.ErrorCode.MISSING_INPUT)
- .setErrorDetail(e.toString()))
- .build();
+ static final int MAX_BLOB_SIZE_FOR_INLINE = 1024 * 10;
+
+ private void passOutErr(byte[] stdout, byte[] stderr, ActionResult.Builder result)
+ throws InterruptedException {
+ if (stdout.length <= MAX_BLOB_SIZE_FOR_INLINE) {
+ result.setStdoutRaw(ByteString.copyFrom(stdout));
+ } else if (stdout.length > 0) {
+ result.setStdoutDigest(cache.uploadBlob(stdout));
+ }
+ if (stderr.length <= MAX_BLOB_SIZE_FOR_INLINE) {
+ result.setStderrRaw(ByteString.copyFrom(stderr));
+ } else if (stderr.length > 0) {
+ result.setStderrDigest(cache.uploadBlob(stderr));
}
+ }
- List<Path> outputs = new ArrayList<>(action.getOutputPathList().size());
- for (String output : action.getOutputPathList()) {
+ public ActionResult execute(Action action, Path execRoot)
+ throws IOException, InterruptedException, IllegalArgumentException, CacheNotFoundException {
+ ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+ ByteArrayOutputStream stderr = new ByteArrayOutputStream();
+ ActionResult.Builder result = ActionResult.newBuilder();
+ com.google.devtools.remoteexecution.v1test.Command command =
+ com.google.devtools.remoteexecution.v1test.Command.parseFrom(
+ cache.downloadBlob(action.getCommandDigest()));
+ cache.downloadTree(action.getInputRootDigest(), execRoot);
+
+ List<Path> outputs = new ArrayList<>(action.getOutputFilesList().size());
+ for (String output : action.getOutputFilesList()) {
Path file = execRoot.getRelative(output);
if (file.exists()) {
throw new FileAlreadyExistsException("Output file already exists: " + file);
@@ -518,63 +553,58 @@ public class RemoteWorker {
FileSystemUtils.createDirectoryAndParents(file.getParentDirectory());
outputs.add(file);
}
+ // TODO(olaola): support output directories.
// TODO(ulfjack): This is basically a copy of LocalSpawnRunner. Ideally, we'd use that
// implementation instead of copying it.
- // TODO(ulfjack): Timeout is specified in ExecuteRequest, but not passed in yet.
- int timeoutSeconds = 60 * 15;
Command cmd =
getCommand(
action,
- command.getArgvList().toArray(new String[] {}),
+ command.getArgumentsList().toArray(new String[] {}),
getEnvironmentVariables(command),
execRoot.getPathString());
-
long startTime = System.currentTimeMillis();
- CommandResult cmdResult;
+ CommandResult cmdResult = null;
try {
cmdResult = cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdout, stderr, true);
} catch (AbnormalTerminationException e) {
cmdResult = e.getResult();
} catch (CommandException e) {
- // At the time this comment was written, this must be a ExecFailedException encapsulating an
- // IOException from the underlying Subprocess.Factory.
- LOG.warning("Execution failed for " + command.getArgvList());
- return ExecuteReply.newBuilder()
- .setResult(
- ActionResult.newBuilder()
- .setReturnCode(LOCAL_EXEC_ERROR))
- .setStatus(
- ExecutionStatus.newBuilder()
- .setExecuted(false)
- .setSucceeded(false)
- .setError(ExecutionStatus.ErrorCode.EXEC_FAILED)
- .setErrorDetail(e.toString()))
- .build();
+ // At the time this comment was written, this must be a ExecFailedException encapsulating
+ // an IOException from the underlying Subprocess.Factory.
+ }
+ final int timeoutSeconds = 60 * 15;
+ // TODO(ulfjack): Timeout is specified in ExecuteRequest, but not passed in yet.
+ boolean wasTimeout =
+ cmdResult != null && cmdResult.getTerminationStatus().timedout()
+ || wasTimeout(timeoutSeconds, System.currentTimeMillis() - startTime);
+ int exitCode;
+ if (wasTimeout) {
+ final String errMessage =
+ "Command:\n"
+ + command.getArgumentsList()
+ + "\nexceeded deadline of "
+ + timeoutSeconds
+ + "seconds";
+ LOG.warning(errMessage);
+ throw StatusProto.toStatusRuntimeException(
+ Status.newBuilder()
+ .setCode(Code.DEADLINE_EXCEEDED.getNumber())
+ .setMessage(errMessage)
+ .build());
+ } else if (cmdResult == null) {
+ exitCode = LOCAL_EXEC_ERROR;
+ } else {
+ exitCode = cmdResult.getTerminationStatus().getRawExitCode();
+ }
+
+ passOutErr(stdout.toByteArray(), stderr.toByteArray(), result);
+ cache.uploadAllResults(execRoot, outputs, result);
+ ActionResult finalResult = result.setExitCode(exitCode).build();
+ if (exitCode == 0) {
+ cache.setCachedActionResult(Digests.computeActionKey(action), finalResult);
}
- long wallTime = System.currentTimeMillis() - startTime;
- boolean wasTimeout = cmdResult.getTerminationStatus().timedout()
- || wasTimeout(timeoutSeconds, wallTime);
- Status status = wasTimeout ? Status.TIMEOUT : Status.SUCCESS;
- int exitCode = status == Status.TIMEOUT
- ? SIGALRM_EXIT_CODE
- : cmdResult.getTerminationStatus().getRawExitCode();
-
- ImmutableList<ContentDigest> outErrDigests =
- cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray()));
- ContentDigest stdoutDigest = outErrDigests.get(0);
- ContentDigest stderrDigest = outErrDigests.get(1);
- ActionResult.Builder actionResult =
- ActionResult.newBuilder()
- .setReturnCode(exitCode)
- .setStdoutDigest(stdoutDigest)
- .setStderrDigest(stderrDigest);
- cache.uploadAllResults(execRoot, outputs, actionResult);
- cache.setCachedActionResult(ContentDigests.computeActionKey(action), actionResult.build());
- return ExecuteReply.newBuilder()
- .setResult(actionResult)
- .setStatus(ExecutionStatus.newBuilder().setExecuted(true).setSucceeded(true))
- .build();
+ return finalResult;
}
private boolean wasTimeout(int timeoutSeconds, long wallTimeMillis) {
@@ -582,7 +612,7 @@ public class RemoteWorker {
}
@Override
- public void execute(ExecuteRequest request, StreamObserver<ExecuteReply> responseObserver) {
+ public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
Path tempRoot = workPath.getRelative("build-" + UUID.randomUUID().toString());
try {
tempRoot.createDirectory();
@@ -591,33 +621,42 @@ public class RemoteWorker {
"Work received has "
+ request.getTotalInputFileCount()
+ " input files and "
- + request.getAction().getOutputPathCount()
+ + request.getAction().getOutputFilesCount()
+ " output files.");
}
- ExecuteReply reply = execute(request.getAction(), tempRoot);
- responseObserver.onNext(reply);
+ ActionResult result = execute(request.getAction(), tempRoot);
+ responseObserver.onNext(
+ Operation.newBuilder()
+ .setDone(true)
+ .setResponse(Any.pack(ExecuteResponse.newBuilder().setResult(result).build()))
+ .build());
+ responseObserver.onCompleted();
if (workerOptions.debug) {
- if (!reply.getStatus().getSucceeded()) {
- LOG.warning("Work failed. Request: " + request.toString() + ".");
- } else if (LOG_FINER) {
- LOG.fine("Work completed.");
- }
- }
- if (!workerOptions.debug) {
- FileSystemUtils.deleteTree(tempRoot);
+ LOG.fine("Work completed.");
+ LOG.warning("Preserving work directory " + tempRoot);
} else {
- LOG.warning("Preserving work directory " + tempRoot.toString() + ".");
+ FileSystemUtils.deleteTree(tempRoot);
}
- } catch (IOException | InterruptedException e) {
- LOG.log(Level.SEVERE, "Failure", e);
- ExecuteReply.Builder reply = ExecuteReply.newBuilder();
- reply.getStatusBuilder().setSucceeded(false).setErrorDetail(e.toString());
- responseObserver.onNext(reply.build());
+ } catch (CacheNotFoundException e) {
+ LOG.warning("Cache miss on " + e.getMissingDigest());
+ responseObserver.onError(notFoundError(e.getMissingDigest()));
+ } catch (StatusRuntimeException e) {
+ responseObserver.onError(e);
+ } catch (IllegalArgumentException e) {
+ responseObserver.onError(
+ StatusProto.toStatusRuntimeException(
+ Status.newBuilder()
+ .setCode(Code.INVALID_ARGUMENT.getNumber())
+ .setMessage(e.toString())
+ .build()));
+ } catch (Exception e) {
+ StringWriter stringWriter = new StringWriter();
+ e.printStackTrace(new PrintWriter(stringWriter));
+ LOG.log(Level.SEVERE, "Work failed: " + e + stringWriter.toString());
+ responseObserver.onError(internalError(e));
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
- } finally {
- responseObserver.onCompleted();
}
}
}