aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorGravatar olaola <olaola@google.com>2017-06-09 04:33:25 +0200
committerGravatar Jakob Buchgraber <buchgr@google.com>2017-06-09 10:23:23 +0200
commit460a105ea61c23b1553a89d8f7a14170ad359e08 (patch)
tree066f0f98f03536efad02ab5ad0e64d9d4c5e054f
parent88677dbef8a9a1f88a35229f342fdf8523273456 (diff)
Also refactored away the various *Interface* files, no need since unit testing can be done with mocking the appropriate gRPC Impl classes directly (see tests). This also fixes the RemoteSpawnRunner, which should use different objects for remote caching and remote execution, the same way RemoteSpawnStrategy does. RELNOTES: n/a PiperOrigin-RevId: 158473700
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/BUILD9
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java10
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java102
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java24
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/Chunker.java218
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/Digests.java (renamed from src/main/java/com/google/devtools/build/lib/remote/ContentDigests.java)61
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java603
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java43
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java29
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java27
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java131
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java80
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/README.md4
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java29
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java32
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java136
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java163
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java3
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java115
-rw-r--r--src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java138
-rw-r--r--src/main/protobuf/BUILD9
-rw-r--r--src/main/protobuf/remote_protocol.proto329
-rw-r--r--src/test/java/com/google/devtools/build/lib/BUILD9
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java236
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java43
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java21
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java56
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java629
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java383
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/InMemoryCas.java143
-rw-r--r--src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java70
-rwxr-xr-xsrc/test/shell/bazel/remote_execution_test.sh2
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD10
-rw-r--r--src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java649
34 files changed, 2029 insertions, 2517 deletions
diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD
index 3a3c304d1e..d2e82efa60 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD
@@ -23,8 +23,6 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/actions",
"//src/main/java/com/google/devtools/build/lib/standalone",
"//src/main/java/com/google/devtools/common/options",
- "//src/main/protobuf:remote_protocol_java_grpc",
- "//src/main/protobuf:remote_protocol_java_proto",
"//third_party:apache_httpclient",
"//third_party:apache_httpcore",
"//third_party:auth",
@@ -35,6 +33,13 @@ java_library(
"//third_party:netty",
"//third_party/grpc:grpc-jar",
"//third_party/protobuf:protobuf_java",
+ "//third_party/protobuf:protobuf_java_util",
+ "@googleapis//:google_bytestream_bytestream_java_grpc",
+ "@googleapis//:google_bytestream_bytestream_java_proto",
+ "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc",
+ "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto",
+ "@googleapis//:google_longrunning_operations_java_proto",
+ "@googleapis//:google_rpc_error_details_java_proto",
],
)
diff --git a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java b/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
index 5820e0bf95..cffc58fd4b 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/CacheNotFoundException.java
@@ -14,25 +14,25 @@
package com.google.devtools.build.lib.remote;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+import com.google.devtools.remoteexecution.v1test.Digest;
/**
* An exception to indicate cache misses.
* TODO(olaola): have a class of checked RemoteCacheExceptions.
*/
public final class CacheNotFoundException extends Exception {
- private final ContentDigest missingDigest;
+ private final Digest missingDigest;
- CacheNotFoundException(ContentDigest missingDigest) {
+ CacheNotFoundException(Digest missingDigest) {
this.missingDigest = missingDigest;
}
- public ContentDigest getMissingDigest() {
+ public Digest getMissingDigest() {
return missingDigest;
}
@Override
public String toString() {
- return "Missing digest: " + ContentDigests.toString(missingDigest);
+ return "Missing digest: " + missingDigest;
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java
index 87da7c2405..62605e1c2d 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunner.java
@@ -13,28 +13,27 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.Spawn;
+import com.google.devtools.build.lib.actions.Spawns;
import com.google.devtools.build.lib.actions.UserExecException;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.exec.SpawnResult;
import com.google.devtools.build.lib.exec.SpawnResult.Status;
import com.google.devtools.build.lib.exec.SpawnRunner;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Command;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Platform;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.remoteexecution.v1test.Action;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Command;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.Platform;
+import com.google.protobuf.Duration;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
import io.grpc.StatusRuntimeException;
@@ -56,14 +55,11 @@ final class CachedLocalSpawnRunner implements SpawnRunner {
// TODO(olaola): This will be set on a per-action basis instead.
private final Platform platform;
- private final RemoteActionCache actionCache;
+ private final RemoteActionCache remoteCache;
private final SpawnRunner delegate;
CachedLocalSpawnRunner(
- Path execRoot,
- RemoteOptions options,
- RemoteActionCache actionCache,
- SpawnRunner delegate) {
+ Path execRoot, RemoteOptions options, RemoteActionCache remoteCache, SpawnRunner delegate) {
this.execRoot = execRoot;
this.options = options;
if (options.experimentalRemotePlatformOverride != null) {
@@ -77,15 +73,13 @@ final class CachedLocalSpawnRunner implements SpawnRunner {
} else {
platform = null;
}
- this.actionCache = actionCache;
+ this.remoteCache = remoteCache;
this.delegate = delegate;
}
@Override
- public SpawnResult exec(
- Spawn spawn,
- SpawnExecutionPolicy policy)
- throws InterruptedException, IOException, ExecException {
+ public SpawnResult exec(Spawn spawn, SpawnExecutionPolicy policy)
+ throws InterruptedException, IOException, ExecException {
ActionKey actionKey = null;
String mnemonic = spawn.getMnemonic();
@@ -100,24 +94,26 @@ final class CachedLocalSpawnRunner implements SpawnRunner {
Action action =
buildAction(
spawn.getOutputFiles(),
- ContentDigests.computeDigest(command),
- repository.getMerkleDigest(inputRoot));
+ Digests.computeDigest(command),
+ repository.getMerkleDigest(inputRoot),
+ // TODO(olaola): set sensible local and remote timeouts.
+ Spawns.getTimeoutSeconds(spawn, 120));
// Look up action cache, and reuse the action output if it is found.
- actionKey = ContentDigests.computeActionKey(action);
+ actionKey = Digests.computeActionKey(action);
ActionResult result =
- this.options.remoteAcceptCached ? actionCache.getCachedActionResult(actionKey) : null;
+ this.options.remoteAcceptCached ? remoteCache.getCachedActionResult(actionKey) : null;
if (result != null) {
// We don't cache failed actions, so we know the outputs exist.
// For now, download all outputs locally; in the future, we can reuse the digests to
// just update the TreeNodeRepository and continue the build.
try {
// TODO(ulfjack): Download stdout, stderr, and the output files in a single call.
- actionCache.downloadAllResults(result, execRoot);
+ remoteCache.downloadAllResults(result, execRoot);
passRemoteOutErr(result, policy.getFileOutErr());
return new SpawnResult.Builder()
.setStatus(Status.SUCCESS)
- .setExitCode(result.getReturnCode())
+ .setExitCode(result.getExitCode())
.build();
} catch (CacheNotFoundException e) {
// TODO(ulfjack): Track down who throws this exception in what cases and double-check that
@@ -138,44 +134,62 @@ final class CachedLocalSpawnRunner implements SpawnRunner {
}
private Action buildAction(
- Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) {
+ Collection<? extends ActionInput> outputs,
+ Digest command,
+ Digest inputRoot,
+ long timeoutSeconds) {
Action.Builder action = Action.newBuilder();
action.setCommandDigest(command);
action.setInputRootDigest(inputRoot);
// Somewhat ugly: we rely on the stable order of outputs here for remote action caching.
for (ActionInput output : outputs) {
- action.addOutputPath(output.getExecPathString());
+ // TODO: output directories should be handled here, when they are supported.
+ action.addOutputFiles(output.getExecPathString());
}
if (platform != null) {
action.setPlatform(platform);
}
+ action.setTimeout(Duration.newBuilder().setSeconds(timeoutSeconds));
return action.build();
}
- private static Command buildCommand(
- List<String> arguments, ImmutableMap<String, String> environment) {
+ private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) {
Command.Builder command = Command.newBuilder();
- command.addAllArgv(arguments);
+ command.addAllArguments(arguments);
// Sorting the environment pairs by variable name.
TreeSet<String> variables = new TreeSet<>(environment.keySet());
for (String var : variables) {
- command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var));
+ command.addEnvironmentVariablesBuilder().setName(var).setValue(environment.get(var));
}
return command.build();
}
- private void passRemoteOutErr(
- ActionResult result, FileOutErr outErr)
- throws CacheNotFoundException {
- ImmutableList<byte[]> streams =
- actionCache.downloadBlobs(
- ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest()));
- outErr.printOut(new String(streams.get(0), UTF_8));
- outErr.printErr(new String(streams.get(1), UTF_8));
+ private void passRemoteOutErr(ActionResult result, FileOutErr outErr) throws IOException {
+ try {
+ if (!result.getStdoutRaw().isEmpty()) {
+ result.getStdoutRaw().writeTo(outErr.getOutputStream());
+ outErr.getOutputStream().flush();
+ } else if (result.hasStdoutDigest()) {
+ byte[] stdoutBytes = remoteCache.downloadBlob(result.getStdoutDigest());
+ outErr.getOutputStream().write(stdoutBytes);
+ outErr.getOutputStream().flush();
+ }
+ if (!result.getStderrRaw().isEmpty()) {
+ result.getStderrRaw().writeTo(outErr.getErrorStream());
+ outErr.getErrorStream().flush();
+ } else if (result.hasStderrDigest()) {
+ byte[] stderrBytes = remoteCache.downloadBlob(result.getStderrDigest());
+ outErr.getErrorStream().write(stderrBytes);
+ outErr.getErrorStream().flush();
+ }
+ } catch (CacheNotFoundException e) {
+ outErr.printOutLn("Failed to fetch remote stdout/err due to cache miss.");
+ outErr.getOutputStream().flush();
+ }
}
private void writeCacheEntry(Spawn spawn, FileOutErr outErr, ActionKey actionKey)
- throws IOException, InterruptedException {
+ throws IOException, InterruptedException {
ArrayList<Path> outputFiles = new ArrayList<>();
for (ActionInput output : spawn.getOutputFiles()) {
Path outputPath = execRoot.getRelative(output.getExecPathString());
@@ -186,11 +200,11 @@ final class CachedLocalSpawnRunner implements SpawnRunner {
}
}
ActionResult.Builder result = ActionResult.newBuilder();
- actionCache.uploadAllResults(execRoot, outputFiles, result);
- ContentDigest stderr = actionCache.uploadFileContents(outErr.getErrorPath());
- ContentDigest stdout = actionCache.uploadFileContents(outErr.getOutputPath());
+ remoteCache.uploadAllResults(execRoot, outputFiles, result);
+ Digest stderr = remoteCache.uploadFileContents(outErr.getErrorPath());
+ Digest stdout = remoteCache.uploadFileContents(outErr.getOutputPath());
result.setStderrDigest(stderr);
result.setStdoutDigest(stdout);
- actionCache.setCachedActionResult(actionKey, result.build());
+ remoteCache.setCachedActionResult(actionKey, result.build());
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java b/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java
index c2654586bf..a78458d135 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/ChannelOptions.java
@@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
+import com.google.devtools.common.options.Options;
import io.grpc.CallCredentials;
import io.grpc.auth.MoreCallCredentials;
import io.grpc.netty.GrpcSslContexts;
@@ -33,24 +34,21 @@ import javax.net.ssl.SSLException;
/** Instantiate all authentication helpers from build options. */
@ThreadSafe
public final class ChannelOptions {
- private final int maxMessageSize;
private final boolean tlsEnabled;
private final SslContext sslContext;
private final String tlsAuthorityOverride;
private final CallCredentials credentials;
- private static final int CHUNK_MESSAGE_OVERHEAD = 1024;
+ public static final ChannelOptions DEFAULT = create(Options.getDefaults(AuthAndTLSOptions.class));
private ChannelOptions(
boolean tlsEnabled,
SslContext sslContext,
String tlsAuthorityOverride,
- CallCredentials credentials,
- int maxMessageSize) {
+ CallCredentials credentials) {
this.tlsEnabled = tlsEnabled;
this.sslContext = sslContext;
this.tlsAuthorityOverride = tlsAuthorityOverride;
this.credentials = credentials;
- this.maxMessageSize = maxMessageSize;
}
public boolean tlsEnabled() {
@@ -69,15 +67,10 @@ public final class ChannelOptions {
return sslContext;
}
- public int maxMessageSize() {
- return maxMessageSize;
- }
-
- public static ChannelOptions create(AuthAndTLSOptions options, int grpcMaxChunkSizeBytes) {
+ public static ChannelOptions create(AuthAndTLSOptions options) {
try {
return create(
options,
- grpcMaxChunkSizeBytes,
options.authCredentials != null
? new FileInputStream(options.authCredentials)
: null);
@@ -88,7 +81,8 @@ public final class ChannelOptions {
}
@VisibleForTesting
- public static ChannelOptions create(AuthAndTLSOptions options, int grpcMaxChunkSizeBytes,
+ public static ChannelOptions create(
+ AuthAndTLSOptions options,
@Nullable InputStream credentialsInputStream) {
boolean tlsEnabled = options.tlsEnabled;
SslContext sslContext = null;
@@ -118,11 +112,7 @@ public final class ChannelOptions {
"Failed initializing auth credentials for remote cache/execution " + e);
}
}
- final int maxMessageSize =
- Math.max(
- 4 * 1024 * 1024 /* GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE */,
- grpcMaxChunkSizeBytes + CHUNK_MESSAGE_OVERHEAD);
return new ChannelOptions(
- tlsEnabled, sslContext, tlsAuthorityOverride, credentials, maxMessageSize);
+ tlsEnabled, sslContext, tlsAuthorityOverride, credentials);
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
index a9bdd6dc13..0c407a29b7 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Chunker.java
@@ -14,114 +14,162 @@
package com.google.devtools.build.lib.remote;
-import com.google.common.collect.ImmutableSet;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
-import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.Path;
-import com.google.protobuf.ByteString;
+import com.google.devtools.remoteexecution.v1test.Digest;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Set;
-import javax.annotation.Nullable;
-/** An iterator-type object that transforms byte sources into a stream of BlobChunk messages. */
+/** An iterator-type object that transforms byte sources into a stream of Chunks. */
public final class Chunker {
+ // This is effectively final, should be changed only in unit-tests!
+ private static int DEFAULT_CHUNK_SIZE = 1024 * 16;
+ private static byte[] EMPTY_BLOB = new byte[0];
+
+ @VisibleForTesting
+ static void setDefaultChunkSizeForTesting(int value) {
+ DEFAULT_CHUNK_SIZE = value;
+ }
+
+ public static int getDefaultChunkSize() {
+ return DEFAULT_CHUNK_SIZE;
+ }
+
+ /** A piece of a byte[] blob. */
+ public static final class Chunk {
+
+ private final Digest digest;
+ private final long offset;
+ // TODO(olaola): consider saving data in a different format that byte[].
+ private final byte[] data;
+
+ @VisibleForTesting
+ public Chunk(Digest digest, byte[] data, long offset) {
+ this.digest = digest;
+ this.data = data;
+ this.offset = offset;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof Chunk)) {
+ return false;
+ }
+ Chunk other = (Chunk) o;
+ return other.offset == offset
+ && other.digest.equals(digest)
+ && Arrays.equals(other.data, data);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(digest, offset, Arrays.hashCode(data));
+ }
+
+ public Digest getDigest() {
+ return digest;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ // This returns a mutable copy, for efficiency.
+ public byte[] getData() {
+ return data;
+ }
+ }
+
/** An Item is an opaque digestable source of bytes. */
interface Item {
- ContentDigest getDigest() throws IOException;
+ Digest getDigest() throws IOException;
InputStream getInputStream() throws IOException;
}
private final Iterator<Item> inputIterator;
private InputStream currentStream;
- private final Set<ContentDigest> digests;
- private ContentDigest digest;
+ private Digest digest;
private long bytesLeft;
private final int chunkSize;
- Chunker(
- Iterator<Item> inputIterator,
- int chunkSize,
- // If present, specifies which digests to output out of the whole input.
- @Nullable Set<ContentDigest> digests)
- throws IOException {
+ Chunker(Iterator<Item> inputIterator, int chunkSize) throws IOException {
Preconditions.checkArgument(chunkSize > 0, "Chunk size must be greater than 0");
- this.digests = digests;
this.inputIterator = inputIterator;
this.chunkSize = chunkSize;
advanceInput();
}
- Chunker(Iterator<Item> inputIterator, int chunkSize) throws IOException {
- this(inputIterator, chunkSize, null);
+ Chunker(Item input, int chunkSize) throws IOException {
+ this(Iterators.singletonIterator(input), chunkSize);
}
- Chunker(Item input, int chunkSize) throws IOException {
- this(Iterators.singletonIterator(input), chunkSize, ImmutableSet.of(input.getDigest()));
- }
-
- private void advanceInput() throws IOException {
- do {
- if (inputIterator != null && inputIterator.hasNext()) {
- Item input = inputIterator.next();
- digest = input.getDigest();
- currentStream = input.getInputStream();
- bytesLeft = digest.getSizeBytes();
- } else {
- digest = null;
- currentStream = null;
- bytesLeft = 0;
- }
- } while (digest != null && digests != null && !digests.contains(digest));
+ public void advanceInput() throws IOException {
+ if (inputIterator != null && inputIterator.hasNext()) {
+ Item input = inputIterator.next();
+ digest = input.getDigest();
+ currentStream = input.getInputStream();
+ bytesLeft = digest.getSizeBytes();
+ } else {
+ digest = null;
+ currentStream = null;
+ bytesLeft = 0;
+ }
}
- /** True if the object has more BlobChunk elements. */
+ /** True if the object has more Chunk elements. */
public boolean hasNext() {
return currentStream != null;
}
- /** Consume the next BlobChunk element. */
- public BlobChunk next() throws IOException {
+ /** Consume the next Chunk element. */
+ public Chunk next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
- BlobChunk.Builder chunk = BlobChunk.newBuilder();
- long offset = digest.getSizeBytes() - bytesLeft;
- if (offset == 0) {
- chunk.setDigest(digest);
- } else {
- chunk.setOffset(offset);
- }
+ final long offset = digest.getSizeBytes() - bytesLeft;
+ byte[] blob = EMPTY_BLOB;
if (bytesLeft > 0) {
- byte[] blob = new byte[(int) Math.min(bytesLeft, chunkSize)];
+ blob = new byte[(int) Math.min(bytesLeft, chunkSize)];
currentStream.read(blob);
- chunk.setData(ByteString.copyFrom(blob));
bytesLeft -= blob.length;
}
+ final Chunk result = new Chunk(digest, blob, offset);
if (bytesLeft == 0) {
currentStream.close();
advanceInput(); // Sets the current stream to null, if it was the last.
}
- return chunk.build();
+ return result;
}
static Item toItem(final byte[] blob) {
return new Item() {
+ Digest digest = null;
+
@Override
- public ContentDigest getDigest() throws IOException {
- return ContentDigests.computeDigest(blob);
+ public Digest getDigest() throws IOException {
+ if (digest == null) {
+ digest = Digests.computeDigest(blob);
+ }
+ return digest;
}
@Override
@@ -133,9 +181,14 @@ public final class Chunker {
static Item toItem(final Path file) {
return new Item() {
+ Digest digest = null;
+
@Override
- public ContentDigest getDigest() throws IOException {
- return ContentDigests.computeDigest(file);
+ public Digest getDigest() throws IOException {
+ if (digest == null) {
+ digest = Digests.computeDigest(file);
+ }
+ return digest;
}
@Override
@@ -152,8 +205,8 @@ public final class Chunker {
}
return new Item() {
@Override
- public ContentDigest getDigest() throws IOException {
- return ContentDigests.getDigestFromInputCache(input, inputCache);
+ public Digest getDigest() throws IOException {
+ return Digests.getDigestFromInputCache(input, inputCache);
}
@Override
@@ -165,9 +218,14 @@ public final class Chunker {
static Item toItem(final VirtualActionInput input) {
return new Item() {
+ Digest digest = null;
+
@Override
- public ContentDigest getDigest() throws IOException {
- return ContentDigests.computeDigest(input);
+ public Digest getDigest() throws IOException {
+ if (digest == null) {
+ digest = Digests.computeDigest(input);
+ }
+ return digest;
}
@Override
@@ -189,27 +247,67 @@ public final class Chunker {
return new Chunker(toItem(input, inputCache, execRoot), chunkSize);
}
+ /**
+ * Create a Chunker from a given ActionInput, taking its digest from the provided
+ * ActionInputFileCache.
+ */
+ public static Chunker from(ActionInput input, ActionInputFileCache inputCache, Path execRoot)
+ throws IOException {
+ return from(input, getDefaultChunkSize(), inputCache, execRoot);
+ }
+
/** Create a Chunker from a given blob and chunkSize. */
public static Chunker from(byte[] blob, int chunkSize) throws IOException {
return new Chunker(toItem(blob), chunkSize);
}
+ /** Create a Chunker from a given blob. */
+ public static Chunker from(byte[] blob) throws IOException {
+ return from(blob, getDefaultChunkSize());
+ }
+
/** Create a Chunker from a given Path and chunkSize. */
public static Chunker from(Path file, int chunkSize) throws IOException {
return new Chunker(toItem(file), chunkSize);
}
+ /** Create a Chunker from a given Path. */
+ public static Chunker from(Path file) throws IOException {
+ return from(file, getDefaultChunkSize());
+ }
+
+ static class MemberOf implements Predicate<Item> {
+ private final Set<Digest> digests;
+
+ public MemberOf(Set<Digest> digests) {
+ this.digests = digests;
+ }
+
+ @Override
+ public boolean apply(Item item) {
+ try {
+ return digests.contains(item.getDigest());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
/**
* Create a Chunker from multiple input sources. The order of the sources provided to the Builder
* will be the same order they will be chunked by.
*/
public static final class Builder {
private final ArrayList<Item> items = new ArrayList<>();
- private Set<ContentDigest> digests = null;
- private int chunkSize = 0;
+ private Set<Digest> digests = null;
+ private int chunkSize = getDefaultChunkSize();
public Chunker build() throws IOException {
- return new Chunker(items.iterator(), chunkSize, digests);
+ return new Chunker(
+ digests == null
+ ? items.iterator()
+ : Iterators.filter(items.iterator(), new MemberOf(digests)),
+ chunkSize);
}
public Builder chunkSize(int chunkSize) {
@@ -221,7 +319,7 @@ public final class Chunker {
* Restricts the Chunker to use only inputs with these digests. This is an optimization for CAS
* uploads where a list of digests missing from the CAS is known.
*/
- public Builder onlyUseDigests(Set<ContentDigest> digests) {
+ public Builder onlyUseDigests(Set<Digest> digests) {
this.digests = digests;
return this;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/ContentDigests.java b/src/main/java/com/google/devtools/build/lib/remote/Digests.java
index 21a739c468..9121bc2782 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/ContentDigests.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/Digests.java
@@ -14,34 +14,35 @@
package com.google.devtools.build.lib.remote;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
import com.google.devtools.build.lib.vfs.Path;
-import com.google.protobuf.ByteString;
+import com.google.devtools.remoteexecution.v1test.Action;
+import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.protobuf.Message;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-/** Helper methods relating to computing ContentDigest messages for remote execution. */
+/** Helper methods relating to computing Digest messages for remote execution. */
@ThreadSafe
-public final class ContentDigests {
- private ContentDigests() {}
+public final class Digests {
+ private Digests() {}
- public static ContentDigest computeDigest(byte[] blob) {
- return buildDigest(Hashing.sha1().hashBytes(blob).asBytes(), blob.length);
+ public static Digest computeDigest(byte[] blob) {
+ return buildDigest(Hashing.sha1().hashBytes(blob).toString(), blob.length);
}
- public static ContentDigest computeDigest(Path file) throws IOException {
+ public static Digest computeDigest(Path file) throws IOException {
return buildDigest(file.getSHA1Digest(), file.getFileSize());
}
- public static ContentDigest computeDigest(VirtualActionInput input) throws IOException {
+ public static Digest computeDigest(VirtualActionInput input) throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
input.writeTo(buffer);
return computeDigest(buffer.toByteArray());
@@ -52,22 +53,26 @@ public final class ContentDigests {
* bytes, but this implementation relies on the stability of the proto encoding, in particular
* between different platforms and languages. TODO(olaola): upgrade to a better implementation!
*/
- public static ContentDigest computeDigest(Message message) {
+ public static Digest computeDigest(Message message) {
return computeDigest(message.toByteArray());
}
+ public static Digest computeDigestUtf8(String str) {
+ return computeDigest(str.getBytes(UTF_8));
+ }
+
/**
- * A special type of ContentDigest that is used only as a remote action cache key. This is a
- * separate type in order to prevent accidentally using other ContentDigests as action keys.
+ * A special type of Digest that is used only as a remote action cache key. This is a
+ * separate type in order to prevent accidentally using other Digests as action keys.
*/
public static final class ActionKey {
- private final ContentDigest digest;
+ private final Digest digest;
- public ContentDigest getDigest() {
+ public Digest getDigest() {
return digest;
}
- private ActionKey(ContentDigest digest) {
+ private ActionKey(Digest digest) {
this.digest = digest;
}
}
@@ -77,29 +82,23 @@ public final class ContentDigests {
}
/**
- * Assumes that the given ContentDigest is a valid digest of an Action, and creates an ActionKey
+ * Assumes that the given Digest is a valid digest of an Action, and creates an ActionKey
* wrapper. This should not be called on the client side!
*/
- public static ActionKey unsafeActionKeyFromDigest(ContentDigest digest) {
+ public static ActionKey unsafeActionKeyFromDigest(Digest digest) {
return new ActionKey(digest);
}
- public static ContentDigest buildDigest(byte[] digest, long size) {
- ContentDigest.Builder b = ContentDigest.newBuilder();
- b.setDigest(ByteString.copyFrom(digest)).setSizeBytes(size);
- return b.build();
+ public static Digest buildDigest(byte[] hash, long size) {
+ return buildDigest(HashCode.fromBytes(hash).toString(), size);
}
- public static ContentDigest getDigestFromInputCache(ActionInput input, ActionInputFileCache cache)
- throws IOException {
- return buildDigest(cache.getDigest(input), cache.getSizeInBytes(input));
+ public static Digest buildDigest(String hexHash, long size) {
+ return Digest.newBuilder().setHash(hexHash).setSizeBytes(size).build();
}
- public static String toHexString(ContentDigest digest) {
- return HashCode.fromBytes(digest.getDigest().toByteArray()).toString();
- }
-
- public static String toString(ContentDigest digest) {
- return "<digest: " + toHexString(digest) + ", size: " + digest.getSizeBytes() + " bytes>";
+ public static Digest getDigestFromInputCache(ActionInput input, ActionInputFileCache cache)
+ throws IOException {
+ return buildDigest(cache.getDigest(input), cache.getSizeInBytes(input));
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
index d85125f15c..d93842244d 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcActionCache.java
@@ -14,52 +14,53 @@
package com.google.devtools.build.lib.remote;
+import com.google.bytestream.ByteStreamGrpc;
+import com.google.bytestream.ByteStreamGrpc.ByteStreamBlockingStub;
+import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
+import com.google.bytestream.ByteStreamProto.ReadRequest;
+import com.google.bytestream.ByteStreamProto.ReadResponse;
+import com.google.bytestream.ByteStreamProto.WriteRequest;
+import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileMetadata;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Output;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Output.ContentCase;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
-import com.google.devtools.build.lib.util.Pair;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc;
+import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheBlockingStub;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsRequest;
+import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsResponse;
+import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc;
+import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageBlockingStub;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.Directory;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
+import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
+import com.google.devtools.remoteexecution.v1test.OutputDirectory;
+import com.google.devtools.remoteexecution.v1test.OutputFile;
+import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest;
import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
+import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -67,50 +68,76 @@ import java.util.concurrent.atomic.AtomicReference;
/** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */
@ThreadSafe
public class GrpcActionCache implements RemoteActionCache {
- private static final int MAX_MEMORY_KBYTES = 512 * 1024;
-
- /** Channel over which to send gRPC CAS queries. */
- private final GrpcCasInterface casIface;
-
- private final GrpcExecutionCacheInterface iface;
private final RemoteOptions options;
-
- public GrpcActionCache(
- RemoteOptions options, GrpcCasInterface casIface, GrpcExecutionCacheInterface iface) {
- this.options = options;
- this.casIface = casIface;
- this.iface = iface;
- }
+ private final ChannelOptions channelOptions;
+ private final Channel channel;
@VisibleForTesting
- public GrpcActionCache(
- Channel channel, RemoteOptions options, ChannelOptions channelOptions) {
+ public GrpcActionCache(Channel channel, ChannelOptions channelOptions, RemoteOptions options) {
this.options = options;
- this.casIface =
- GrpcInterfaces.casInterface(options.remoteTimeout, channel, channelOptions);
- this.iface =
- GrpcInterfaces.executionCacheInterface(options.remoteTimeout, channel, channelOptions);
+ this.channelOptions = channelOptions;
+ this.channel = channel;
}
- public GrpcActionCache(RemoteOptions options, ChannelOptions channelOptions) {
- this(RemoteUtils.createChannel(options.remoteCache, channelOptions), options, channelOptions);
- }
+ // All gRPC stubs are reused.
+ private final Supplier<ContentAddressableStorageBlockingStub> casBlockingStub =
+ Suppliers.memoize(
+ new Supplier<ContentAddressableStorageBlockingStub>() {
+ @Override
+ public ContentAddressableStorageBlockingStub get() {
+ return ContentAddressableStorageGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+ });
+
+ private final Supplier<ByteStreamBlockingStub> bsBlockingStub =
+ Suppliers.memoize(
+ new Supplier<ByteStreamBlockingStub>() {
+ @Override
+ public ByteStreamBlockingStub get() {
+ return ByteStreamGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+ });
+
+ private final Supplier<ByteStreamStub> bsStub =
+ Suppliers.memoize(
+ new Supplier<ByteStreamStub>() {
+ @Override
+ public ByteStreamStub get() {
+ return ByteStreamGrpc.newStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+ });
+
+ private final Supplier<ActionCacheBlockingStub> acBlockingStub =
+ Suppliers.memoize(
+ new Supplier<ActionCacheBlockingStub>() {
+ @Override
+ public ActionCacheBlockingStub get() {
+ return ActionCacheGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout, TimeUnit.SECONDS);
+ }
+ });
public static boolean isRemoteCacheOptions(RemoteOptions options) {
return options.remoteCache != null;
}
- private ImmutableSet<ContentDigest> getMissingDigests(Iterable<ContentDigest> digests) {
- CasLookupRequest.Builder request = CasLookupRequest.newBuilder().addAllDigest(digests);
- if (request.getDigestCount() == 0) {
+ private ImmutableSet<Digest> getMissingDigests(Iterable<Digest> digests) {
+ FindMissingBlobsRequest.Builder request =
+ FindMissingBlobsRequest.newBuilder()
+ .setInstanceName(options.remoteInstanceName)
+ .addAllBlobDigests(digests);
+ if (request.getBlobDigestsCount() == 0) {
return ImmutableSet.of();
}
- CasStatus status = casIface.lookup(request.build()).getStatus();
- if (!status.getSucceeded() && status.getError() != CasStatus.ErrorCode.MISSING_DIGEST) {
- // TODO(olaola): here and below, add basic retry logic on transient errors!
- throw new RuntimeException(status.getErrorDetail());
- }
- return ImmutableSet.copyOf(status.getMissingDigestList());
+ FindMissingBlobsResponse response = casBlockingStub.get().findMissingBlobs(request.build());
+ return ImmutableSet.copyOf(response.getMissingBlobDigestsList());
}
/**
@@ -122,26 +149,37 @@ public class GrpcActionCache implements RemoteActionCache {
throws IOException, InterruptedException {
repository.computeMerkleDigests(root);
// TODO(olaola): avoid querying all the digests, only ask for novel subtrees.
- ImmutableSet<ContentDigest> missingDigests = getMissingDigests(repository.getAllDigests(root));
+ ImmutableSet<Digest> missingDigests = getMissingDigests(repository.getAllDigests(root));
// Only upload data that was missing from the cache.
ArrayList<ActionInput> actionInputs = new ArrayList<>();
- ArrayList<FileNode> treeNodes = new ArrayList<>();
+ ArrayList<Directory> treeNodes = new ArrayList<>();
repository.getDataFromDigests(missingDigests, actionInputs, treeNodes);
if (!treeNodes.isEmpty()) {
- CasUploadTreeMetadataRequest.Builder metaRequest =
- CasUploadTreeMetadataRequest.newBuilder().addAllTreeNode(treeNodes);
- CasUploadTreeMetadataReply reply = casIface.uploadTreeMetadata(metaRequest.build());
- if (!reply.getStatus().getSucceeded()) {
- throw new RuntimeException(reply.getStatus().getErrorDetail());
+ // TODO(olaola): split this into multiple requests if total size is > 10MB.
+ BatchUpdateBlobsRequest.Builder treeBlobRequest =
+ BatchUpdateBlobsRequest.newBuilder().setInstanceName(options.remoteInstanceName);
+ for (Directory d : treeNodes) {
+ final byte[] data = d.toByteArray();
+ treeBlobRequest
+ .addRequestsBuilder()
+ .setContentDigest(Digests.computeDigest(data))
+ .setData(ByteString.copyFrom(data));
+ }
+ BatchUpdateBlobsResponse response =
+ casBlockingStub.get().batchUpdateBlobs(treeBlobRequest.build());
+ // TODO(olaola): handle retries on transient errors.
+ for (BatchUpdateBlobsResponse.Response r : response.getResponsesList()) {
+ if (!Status.fromCodeValue(r.getStatus().getCode()).isOk()) {
+ throw StatusProto.toStatusRuntimeException(r.getStatus());
+ }
}
}
if (!actionInputs.isEmpty()) {
uploadChunks(
actionInputs.size(),
new Chunker.Builder()
- .chunkSize(options.grpcMaxChunkSizeBytes)
.addAllInputs(actionInputs, repository.getInputFileCache(), execRoot)
.onlyUseDigests(missingDigests)
.build());
@@ -152,21 +190,11 @@ public class GrpcActionCache implements RemoteActionCache {
* Download the entire tree data rooted by the given digest and write it into the given location.
*/
@Override
- public void downloadTree(ContentDigest rootDigest, Path rootLocation)
+ public void downloadTree(Digest rootDigest, Path rootLocation)
throws IOException, CacheNotFoundException {
throw new UnsupportedOperationException();
}
- private void handleDownloadStatus(CasStatus status) throws CacheNotFoundException {
- if (!status.getSucceeded()) {
- if (status.getError() == CasStatus.ErrorCode.MISSING_DIGEST) {
- throw new CacheNotFoundException(status.getMissingDigest(0));
- }
- // TODO(olaola): deal with other statuses better.
- throw new RuntimeException(status.getErrorDetail());
- }
- }
-
/**
* Download all results of a remotely executed action locally. TODO(olaola): will need to amend to
* include the {@link com.google.devtools.build.lib.remote.TreeNodeRepository} for updating.
@@ -174,106 +202,85 @@ public class GrpcActionCache implements RemoteActionCache {
@Override
public void downloadAllResults(ActionResult result, Path execRoot)
throws IOException, CacheNotFoundException {
- if (result.getOutputList().isEmpty()) {
+ if (result.getOutputFilesList().isEmpty() && result.getOutputDirectoriesList().isEmpty()) {
return;
}
- // Send all the file requests in a single synchronous batch.
- // TODO(olaola): profile to maybe replace with separate concurrent requests.
- CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder();
- Map<ContentDigest, Pair<Path, FileMetadata>> metadataMap = new HashMap<>();
- for (Output output : result.getOutputList()) {
- Path path = execRoot.getRelative(output.getPath());
- if (output.getContentCase() == ContentCase.FILE_METADATA) {
- FileMetadata fileMetadata = output.getFileMetadata();
- ContentDigest digest = fileMetadata.getDigest();
- if (digest.getSizeBytes() > 0) {
- request.addDigest(digest);
- metadataMap.put(digest, Pair.of(path, fileMetadata));
- } else {
- // Handle empty file locally.
- FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
- FileSystemUtils.writeContent(path, new byte[0]);
- }
+ for (OutputFile file : result.getOutputFilesList()) {
+ Path path = execRoot.getRelative(file.getPath());
+ FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
+ Digest digest = file.getDigest();
+ if (digest.getSizeBytes() == 0) {
+ // Handle empty file locally.
+ FileSystemUtils.writeContent(path, new byte[0]);
} else {
- downloadTree(output.getDigest(), path);
+ try (OutputStream stream = path.getOutputStream()) {
+ if (!file.getContent().isEmpty()) {
+ file.getContent().writeTo(stream);
+ } else {
+ Iterator<ReadResponse> replies = readBlob(digest);
+ while (replies.hasNext()) {
+ replies.next().getData().writeTo(stream);
+ }
+ }
+ }
}
+ path.setExecutable(file.getIsExecutable());
}
- Iterator<CasDownloadReply> replies = casIface.downloadBlob(request.build());
- Set<ContentDigest> results = new HashSet<>();
- while (replies.hasNext()) {
- results.add(createFileFromStream(metadataMap, replies));
- }
- for (ContentDigest digest : metadataMap.keySet()) {
- if (!results.contains(digest)) {
- throw new CacheNotFoundException(digest);
- }
+ for (OutputDirectory directory : result.getOutputDirectoriesList()) {
+ downloadTree(directory.getDigest(), execRoot.getRelative(directory.getPath()));
}
}
- private ContentDigest createFileFromStream(
- Map<ContentDigest, Pair<Path, FileMetadata>> metadataMap, Iterator<CasDownloadReply> replies)
- throws IOException, CacheNotFoundException {
- Preconditions.checkArgument(replies.hasNext());
- CasDownloadReply reply = replies.next();
- if (reply.hasStatus()) {
- handleDownloadStatus(reply.getStatus());
+ private Iterator<ReadResponse> readBlob(Digest digest) throws CacheNotFoundException {
+ String resourceName = "";
+ if (!options.remoteInstanceName.isEmpty()) {
+ resourceName += options.remoteInstanceName + "/";
}
- BlobChunk chunk = reply.getData();
- ContentDigest digest = chunk.getDigest();
- Preconditions.checkArgument(metadataMap.containsKey(digest));
- Pair<Path, FileMetadata> metadata = metadataMap.get(digest);
- Path path = metadata.first;
- FileSystemUtils.createDirectoryAndParents(path.getParentDirectory());
- try (OutputStream stream = path.getOutputStream()) {
- ByteString data = chunk.getData();
- data.writeTo(stream);
- long bytesLeft = digest.getSizeBytes() - data.size();
- while (bytesLeft > 0) {
- Preconditions.checkArgument(replies.hasNext());
- reply = replies.next();
- if (reply.hasStatus()) {
- handleDownloadStatus(reply.getStatus());
- }
- chunk = reply.getData();
- data = chunk.getData();
- Preconditions.checkArgument(!chunk.hasDigest());
- Preconditions.checkArgument(chunk.getOffset() == digest.getSizeBytes() - bytesLeft);
- data.writeTo(stream);
- bytesLeft -= data.size();
+ resourceName += "blobs/" + digest.getHash() + "/" + digest.getSizeBytes();
+ try {
+ return bsBlockingStub
+ .get()
+ .read(ReadRequest.newBuilder().setResourceName(resourceName).build());
+ } catch (StatusRuntimeException e) {
+ if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
+ throw new CacheNotFoundException(digest);
}
- path.setExecutable(metadata.second.getExecutable());
+ throw e;
}
- return digest;
}
/** Upload all results of a locally executed action to the cache. */
@Override
public void uploadAllResults(Path execRoot, Collection<Path> files, ActionResult.Builder result)
throws IOException, InterruptedException {
- ArrayList<ContentDigest> digests = new ArrayList<>();
- Chunker.Builder b = new Chunker.Builder().chunkSize(options.grpcMaxChunkSizeBytes);
+ ArrayList<Digest> digests = new ArrayList<>();
+ Chunker.Builder b = new Chunker.Builder();
for (Path file : files) {
- digests.add(ContentDigests.computeDigest(file));
+ if (!file.exists()) {
+ // We ignore requested results that have not been generated by the action.
+ continue;
+ }
+ if (file.isDirectory()) {
+ // TODO(olaola): to implement this for a directory, will need to create or pass a
+ // TreeNodeRepository to call uploadTree.
+ throw new UnsupportedOperationException("Storing a directory is not yet supported.");
+ }
+ digests.add(Digests.computeDigest(file));
b.addInput(file);
}
- ImmutableSet<ContentDigest> missing = getMissingDigests(digests);
+ ImmutableSet<Digest> missing = getMissingDigests(digests);
if (!missing.isEmpty()) {
uploadChunks(missing.size(), b.onlyUseDigests(missing).build());
}
int index = 0;
for (Path file : files) {
- if (file.isDirectory()) {
- // TODO(olaola): to implement this for a directory, will need to create or pass a
- // TreeNodeRepository to call uploadTree.
- throw new UnsupportedOperationException("Storing a directory is not yet supported.");
- }
// Add to protobuf.
+ // TODO(olaola): inline small results here.
result
- .addOutputBuilder()
+ .addOutputFilesBuilder()
.setPath(file.relativeTo(execRoot).getPathString())
- .getFileMetadataBuilder()
.setDigest(digests.get(index++))
- .setExecutable(file.isExecutable());
+ .setIsExecutable(file.isExecutable());
}
}
@@ -284,11 +291,11 @@ public class GrpcActionCache implements RemoteActionCache {
* @return The key for fetching the file contents blob from cache.
*/
@Override
- public ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException {
- ContentDigest digest = ContentDigests.computeDigest(file);
- ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest));
+ public Digest uploadFileContents(Path file) throws IOException, InterruptedException {
+ Digest digest = Digests.computeDigest(file);
+ ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
- uploadChunks(1, Chunker.from(file, options.grpcMaxChunkSizeBytes));
+ uploadChunks(1, Chunker.from(file));
}
return digest;
}
@@ -300,97 +307,84 @@ public class GrpcActionCache implements RemoteActionCache {
* @return The key for fetching the file contents blob from cache.
*/
@Override
- public ContentDigest uploadFileContents(
+ public Digest uploadFileContents(
ActionInput input, Path execRoot, ActionInputFileCache inputCache)
throws IOException, InterruptedException {
- ContentDigest digest = ContentDigests.getDigestFromInputCache(input, inputCache);
- ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest));
+ Digest digest = Digests.getDigestFromInputCache(input, inputCache);
+ ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
if (!missing.isEmpty()) {
- uploadChunks(1, Chunker.from(input, options.grpcMaxChunkSizeBytes, inputCache, execRoot));
+ uploadChunks(1, Chunker.from(input, inputCache, execRoot));
}
return digest;
}
- static class UploadBlobReplyStreamObserver implements StreamObserver<CasUploadBlobReply> {
- private final CountDownLatch finishLatch;
- private final AtomicReference<RuntimeException> exception;
-
- public UploadBlobReplyStreamObserver(
- CountDownLatch finishLatch, AtomicReference<RuntimeException> exception) {
- this.finishLatch = finishLatch;
- this.exception = exception;
- }
-
- @Override
- public void onNext(CasUploadBlobReply reply) {
- if (!reply.getStatus().getSucceeded()) {
- // TODO(olaola): add basic retry logic on transient errors!
- this.exception.compareAndSet(
- null, new RuntimeException(reply.getStatus().getErrorDetail()));
+ private void uploadChunks(int numItems, Chunker chunker)
+ throws InterruptedException, IOException {
+ final CountDownLatch finishLatch = new CountDownLatch(numItems);
+ final AtomicReference<RuntimeException> exception = new AtomicReference<>(null);
+ StreamObserver<WriteRequest> requestObserver = null;
+ String resourceName = "";
+ if (!options.remoteInstanceName.isEmpty()) {
+ resourceName += options.remoteInstanceName + "/";
+ }
+ while (chunker.hasNext()) {
+ Chunker.Chunk chunk = chunker.next();
+ final Digest digest = chunk.getDigest();
+ long offset = chunk.getOffset();
+ WriteRequest.Builder request = WriteRequest.newBuilder();
+ if (offset == 0) { // Beginning of new upload.
+ numItems--;
+ request.setResourceName(
+ resourceName
+ + "uploads/"
+ + UUID.randomUUID()
+ + "/blobs/"
+ + digest.getHash()
+ + "/"
+ + digest.getSizeBytes());
+ // The batches execute simultaneously.
+ requestObserver =
+ bsStub
+ .get()
+ .write(
+ new StreamObserver<WriteResponse>() {
+ private long bytesLeft = digest.getSizeBytes();
+
+ @Override
+ public void onNext(WriteResponse reply) {
+ bytesLeft -= reply.getCommittedSize();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ exception.compareAndSet(
+ null, new StatusRuntimeException(Status.fromThrowable(t)));
+ finishLatch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ if (bytesLeft != 0) {
+ exception.compareAndSet(
+ null, new RuntimeException("Server did not commit all data."));
+ }
+ finishLatch.countDown();
+ }
+ });
}
- }
-
- @Override
- public void onError(Throwable t) {
- this.exception.compareAndSet(null, new StatusRuntimeException(Status.fromThrowable(t)));
- finishLatch.countDown();
- }
-
- @Override
- public void onCompleted() {
- finishLatch.countDown();
- }
- }
-
- private void uploadChunks(int numItems, Chunker blobs) throws InterruptedException, IOException {
- CountDownLatch finishLatch = new CountDownLatch(numItems); // Maximal number of batches.
- AtomicReference<RuntimeException> exception = new AtomicReference<>(null);
- UploadBlobReplyStreamObserver responseObserver = null;
- StreamObserver<CasUploadBlobRequest> requestObserver = null;
- int currentBatchBytes = 0;
- int batchedInputs = 0;
- int batches = 0;
- try {
- while (blobs.hasNext()) {
- BlobChunk chunk = blobs.next();
- if (chunk.hasDigest()) {
- // Determine whether to start next batch.
- final long batchSize = chunk.getDigest().getSizeBytes() + currentBatchBytes;
- if (batchedInputs % options.grpcMaxBatchInputs == 0
- || batchSize > options.grpcMaxBatchSizeBytes) {
- // The batches execute simultaneously.
- if (requestObserver != null) {
- batchedInputs = 0;
- currentBatchBytes = 0;
- requestObserver.onCompleted();
- }
- batches++;
- responseObserver = new UploadBlobReplyStreamObserver(finishLatch, exception);
- requestObserver = casIface.uploadBlobAsync(responseObserver);
- }
- batchedInputs++;
- }
- currentBatchBytes += chunk.getData().size();
- requestObserver.onNext(CasUploadBlobRequest.newBuilder().setData(chunk).build());
- if (finishLatch.getCount() == 0) {
- // RPC completed or errored before we finished sending.
- throw new RuntimeException(
- "gRPC terminated prematurely: "
- + (exception.get() != null ? exception.get() : "unknown cause"));
- }
+ byte[] data = chunk.getData();
+ boolean finishWrite = offset + data.length == digest.getSizeBytes();
+ request.setData(ByteString.copyFrom(data)).setWriteOffset(offset).setFinishWrite(finishWrite);
+ requestObserver.onNext(request.build());
+ if (finishWrite) {
+ requestObserver.onCompleted();
}
- } catch (RuntimeException e) {
- // Cancel RPC
- if (requestObserver != null) {
- requestObserver.onError(e);
+ if (finishLatch.getCount() <= numItems) {
+ // Current RPC errored before we finished sending.
+ if (!finishWrite) {
+ chunker.advanceInput();
+ }
}
- throw e;
- }
- if (requestObserver != null) {
- requestObserver.onCompleted(); // Finish last batch.
- }
- while (batches++ < numItems) {
- finishLatch.countDown(); // Non-sent batches.
}
finishLatch.await(options.remoteTimeout, TimeUnit.SECONDS);
if (exception.get() != null) {
@@ -399,33 +393,12 @@ public class GrpcActionCache implements RemoteActionCache {
}
@Override
- public ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs)
- throws InterruptedException {
- ArrayList<ContentDigest> digests = new ArrayList<>();
- Chunker.Builder b = new Chunker.Builder().chunkSize(options.grpcMaxChunkSizeBytes);
- for (byte[] blob : blobs) {
- digests.add(ContentDigests.computeDigest(blob));
- b.addInput(blob);
- }
- ImmutableSet<ContentDigest> missing = getMissingDigests(digests);
+ public Digest uploadBlob(byte[] blob) throws InterruptedException {
+ Digest digest = Digests.computeDigest(blob);
+ ImmutableSet<Digest> missing = getMissingDigests(ImmutableList.of(digest));
try {
if (!missing.isEmpty()) {
- uploadChunks(missing.size(), b.onlyUseDigests(missing).build());
- }
- return ImmutableList.copyOf(digests);
- } catch (IOException e) {
- // This will never happen.
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public ContentDigest uploadBlob(byte[] blob) throws InterruptedException {
- ContentDigest digest = ContentDigests.computeDigest(blob);
- ImmutableSet<ContentDigest> missing = getMissingDigests(ImmutableList.of(digest));
- try {
- if (!missing.isEmpty()) {
- uploadChunks(1, Chunker.from(blob, options.grpcMaxChunkSizeBytes));
+ uploadChunks(1, Chunker.from(blob));
}
return digest;
} catch (IOException e) {
@@ -435,69 +408,20 @@ public class GrpcActionCache implements RemoteActionCache {
}
@Override
- public byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException {
- return downloadBlobs(ImmutableList.of(digest)).get(0);
- }
-
- @Override
- public ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests)
- throws CacheNotFoundException {
- // Send all the file requests in a single synchronous batch.
- // TODO(olaola): profile to maybe replace with separate concurrent requests.
- CasDownloadBlobRequest.Builder request = CasDownloadBlobRequest.newBuilder();
- for (ContentDigest digest : digests) {
- if (digest.getSizeBytes() > 0) {
- request.addDigest(digest); // We handle empty blobs locally.
- }
+ public byte[] downloadBlob(Digest digest) throws CacheNotFoundException {
+ if (digest.getSizeBytes() == 0) {
+ return new byte[0];
}
- Iterator<CasDownloadReply> replies = null;
- Map<ContentDigest, byte[]> results = new HashMap<>();
- int digestCount = request.getDigestCount();
- if (digestCount > 0) {
- replies = casIface.downloadBlob(request.build());
- while (digestCount-- > 0) {
- Preconditions.checkArgument(replies.hasNext());
- CasDownloadReply reply = replies.next();
- if (reply.hasStatus()) {
- handleDownloadStatus(reply.getStatus());
- }
- BlobChunk chunk = reply.getData();
- ContentDigest digest = chunk.getDigest();
- // This is not enough, but better than nothing.
- Preconditions.checkArgument(digest.getSizeBytes() / 1000.0 < MAX_MEMORY_KBYTES);
- byte[] result = new byte[(int) digest.getSizeBytes()];
- ByteString data = chunk.getData();
- data.copyTo(result, 0);
- int offset = data.size();
- while (offset < result.length) {
- Preconditions.checkArgument(replies.hasNext());
- reply = replies.next();
- if (reply.hasStatus()) {
- handleDownloadStatus(reply.getStatus());
- }
- chunk = reply.getData();
- Preconditions.checkArgument(!chunk.hasDigest());
- Preconditions.checkArgument(chunk.getOffset() == offset);
- data = chunk.getData();
- data.copyTo(result, offset);
- offset += data.size();
- }
- results.put(digest, result);
- }
- }
-
- ArrayList<byte[]> result = new ArrayList<>();
- for (ContentDigest digest : digests) {
- if (digest.getSizeBytes() == 0) {
- result.add(new byte[0]);
- continue;
- }
- if (!results.containsKey(digest)) {
- throw new CacheNotFoundException(digest);
- }
- result.add(results.get(digest));
+ Iterator<ReadResponse> replies = readBlob(digest);
+ byte[] result = new byte[(int) digest.getSizeBytes()];
+ int offset = 0;
+ while (replies.hasNext()) {
+ ByteString data = replies.next().getData();
+ data.copyTo(result, offset);
+ offset += data.size();
}
- return ImmutableList.copyOf(result);
+ Preconditions.checkState(digest.getSizeBytes() == offset);
+ return result;
}
// Execution Cache API
@@ -505,30 +429,39 @@ public class GrpcActionCache implements RemoteActionCache {
/** Returns a cached result for a given Action digest, or null if not found in cache. */
@Override
public ActionResult getCachedActionResult(ActionKey actionKey) {
- ExecutionCacheRequest request =
- ExecutionCacheRequest.newBuilder().setActionDigest(actionKey.getDigest()).build();
- ExecutionCacheReply reply = iface.getCachedResult(request);
- ExecutionCacheStatus status = reply.getStatus();
- if (!status.getSucceeded()
- && status.getError() != ExecutionCacheStatus.ErrorCode.MISSING_RESULT) {
- throw new RuntimeException(status.getErrorDetail());
+ try {
+ return acBlockingStub
+ .get()
+ .getActionResult(
+ GetActionResultRequest.newBuilder()
+ .setInstanceName(options.remoteInstanceName)
+ .setActionDigest(actionKey.getDigest())
+ .build());
+ } catch (StatusRuntimeException e) {
+ if (e.getStatus().getCode() == Status.Code.NOT_FOUND) {
+ return null;
+ }
+ throw e;
}
- return reply.hasResult() ? reply.getResult() : null;
}
/** Sets the given result as result of the given Action. */
@Override
public void setCachedActionResult(ActionKey actionKey, ActionResult result)
throws InterruptedException {
- ExecutionCacheSetRequest request =
- ExecutionCacheSetRequest.newBuilder()
- .setActionDigest(actionKey.getDigest())
- .setResult(result)
- .build();
- ExecutionCacheSetReply reply = iface.setCachedResult(request);
- ExecutionCacheStatus status = reply.getStatus();
- if (!status.getSucceeded() && status.getError() != ExecutionCacheStatus.ErrorCode.UNSUPPORTED) {
- throw new RuntimeException(status.getErrorDetail());
+ try {
+ acBlockingStub
+ .get()
+ .updateActionResult(
+ UpdateActionResultRequest.newBuilder()
+ .setInstanceName(options.remoteInstanceName)
+ .setActionDigest(actionKey.getDigest())
+ .setActionResult(result)
+ .build());
+ } catch (StatusRuntimeException e) {
+ if (e.getStatus().getCode() != Status.Code.UNIMPLEMENTED) {
+ throw e;
+ }
}
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java
deleted file mode 100644
index 529ff9c3db..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcCasInterface.java
+++ /dev/null
@@ -1,43 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote;
-
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub;
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
-import io.grpc.stub.StreamObserver;
-import java.util.Iterator;
-
-/**
- * An abstraction layer between the remote execution client and gRPC to support unit testing. This
- * interface covers the CAS RPC methods, see {@link CasServiceBlockingStub} and
- * {@link CasServiceStub}.
- */
-public interface GrpcCasInterface {
- CasLookupReply lookup(CasLookupRequest request);
- CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request);
- CasDownloadTreeMetadataReply downloadTreeMetadata(CasDownloadTreeMetadataRequest request);
- Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request);
- StreamObserver<CasUploadBlobRequest> uploadBlobAsync(
- StreamObserver<CasUploadBlobReply> responseObserver);
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java
deleted file mode 100644
index 375c2ab359..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionCacheInterface.java
+++ /dev/null
@@ -1,29 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote;
-
-import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
-
-/**
- * An abstraction layer between the remote execution client and gRPC to support unit testing. This
- * interface covers the execution cache RPC methods, see {@link ExecutionCacheServiceBlockingStub}.
- */
-public interface GrpcExecutionCacheInterface {
- ExecutionCacheReply getCachedResult(ExecutionCacheRequest request);
- ExecutionCacheSetReply setCachedResult(ExecutionCacheSetRequest request);
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java
deleted file mode 100644
index fd44792b4a..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcExecutionInterface.java
+++ /dev/null
@@ -1,27 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote;
-
-import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import java.util.Iterator;
-
-/**
- * An abstraction layer between the remote execution client and gRPC to support unit testing. This
- * interface covers the remote execution RPC methods, see {@link ExecuteServiceBlockingStub}.
- */
-public interface GrpcExecutionInterface {
- Iterator<ExecuteReply> execute(ExecuteRequest request);
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java
deleted file mode 100644
index 6e100ae1d0..0000000000
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcInterfaces.java
+++ /dev/null
@@ -1,131 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote;
-
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceBlockingStub;
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceStub;
-import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceBlockingStub;
-import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceBlockingStub;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
-import io.grpc.Channel;
-import io.grpc.stub.StreamObserver;
-import java.util.Iterator;
-import java.util.concurrent.TimeUnit;
-
-/** Implementations of the gRPC interfaces that actually talk to gRPC. */
-public class GrpcInterfaces {
- /** Create a {@link GrpcCasInterface} instance that actually talks to gRPC. */
- public static GrpcCasInterface casInterface(
- final int grpcTimeoutSeconds,
- final Channel channel,
- final ChannelOptions channelOptions) {
- return new GrpcCasInterface() {
- private CasServiceBlockingStub getCasServiceBlockingStub() {
- return CasServiceGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
- }
-
- private CasServiceStub getCasServiceStub() {
- return CasServiceGrpc.newStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
- }
-
- @Override
- public CasLookupReply lookup(CasLookupRequest request) {
- return getCasServiceBlockingStub().lookup(request);
- }
-
- @Override
- public CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request) {
- return getCasServiceBlockingStub().uploadTreeMetadata(request);
- }
-
- @Override
- public CasDownloadTreeMetadataReply downloadTreeMetadata(
- CasDownloadTreeMetadataRequest request) {
- return getCasServiceBlockingStub().downloadTreeMetadata(request);
- }
-
- @Override
- public Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request) {
- return getCasServiceBlockingStub().downloadBlob(request);
- }
-
- @Override
- public StreamObserver<CasUploadBlobRequest> uploadBlobAsync(
- StreamObserver<CasUploadBlobReply> responseObserver) {
- return getCasServiceStub().uploadBlob(responseObserver);
- }
- };
- }
-
- /** Create a {@link GrpcCasInterface} instance that actually talks to gRPC. */
- public static GrpcExecutionCacheInterface executionCacheInterface(
- final int grpcTimeoutSeconds,
- final Channel channel,
- final ChannelOptions channelOptions) {
- return new GrpcExecutionCacheInterface() {
- private ExecutionCacheServiceBlockingStub getExecutionCacheServiceBlockingStub() {
- return ExecutionCacheServiceGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(grpcTimeoutSeconds, TimeUnit.SECONDS);
- }
-
- @Override
- public ExecutionCacheReply getCachedResult(ExecutionCacheRequest request) {
- return getExecutionCacheServiceBlockingStub().getCachedResult(request);
- }
-
- @Override
- public ExecutionCacheSetReply setCachedResult(ExecutionCacheSetRequest request) {
- return getExecutionCacheServiceBlockingStub().setCachedResult(request);
- }
- };
- }
-
- /** Create a {@link GrpcExecutionInterface} instance that actually talks to gRPC. */
- public static GrpcExecutionInterface executionInterface(
- final int grpcTimeoutSeconds,
- final Channel channel,
- final ChannelOptions channelOptions) {
- return new GrpcExecutionInterface() {
- @Override
- public Iterator<ExecuteReply> execute(ExecuteRequest request) {
- ExecuteServiceBlockingStub stub =
- ExecuteServiceGrpc.newBlockingStub(channel)
- .withCallCredentials(channelOptions.getCallCredentials())
- .withDeadlineAfter(
- grpcTimeoutSeconds + request.getTimeoutMillis() / 1000, TimeUnit.SECONDS);
- return stub.execute(request);
- }
- };
- }
-}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
index eb79c91919..1950a724c3 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java
@@ -15,58 +15,54 @@
package com.google.devtools.build.lib.remote;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
-import io.grpc.ManagedChannel;
-import java.util.Iterator;
+import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
+import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
+import com.google.devtools.remoteexecution.v1test.ExecutionGrpc;
+import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionBlockingStub;
+import com.google.longrunning.Operation;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.util.Durations;
+import io.grpc.Channel;
+import io.grpc.protobuf.StatusProto;
+import java.util.concurrent.TimeUnit;
/** A remote work executor that uses gRPC for communicating the work, inputs and outputs. */
@ThreadSafe
-public class GrpcRemoteExecutor extends GrpcActionCache {
+public class GrpcRemoteExecutor {
+ private final RemoteOptions options;
+ private final ChannelOptions channelOptions;
+ private final Channel channel;
+
public static boolean isRemoteExecutionOptions(RemoteOptions options) {
return options.remoteExecutor != null;
}
- private final GrpcExecutionInterface executionIface;
-
- public GrpcRemoteExecutor(
- RemoteOptions options,
- GrpcCasInterface casIface,
- GrpcExecutionCacheInterface cacheIface,
- GrpcExecutionInterface executionIface) {
- super(options, casIface, cacheIface);
- this.executionIface = executionIface;
- }
-
- public GrpcRemoteExecutor(
- ManagedChannel channel, ChannelOptions channelOptions, RemoteOptions options) {
- super(
- options,
- GrpcInterfaces.casInterface(options.remoteTimeout, channel, channelOptions),
- GrpcInterfaces.executionCacheInterface(
- options.remoteTimeout, channel, channelOptions));
- this.executionIface =
- GrpcInterfaces.executionInterface(options.remoteTimeout, channel, channelOptions);
+ public GrpcRemoteExecutor(Channel channel, ChannelOptions channelOptions, RemoteOptions options) {
+ this.options = options;
+ this.channelOptions = channelOptions;
+ this.channel = channel;
}
- public ExecuteReply executeRemotely(ExecuteRequest request) {
- Iterator<ExecuteReply> replies = executionIface.execute(request);
- ExecuteReply reply = null;
- while (replies.hasNext()) {
- reply = replies.next();
- // We can handle the action execution progress here.
+ public ExecuteResponse executeRemotely(ExecuteRequest request) {
+ // TODO(olaola): handle longrunning Operations by using the Watcher API to wait for results.
+ // For now, only support actions with wait_for_completion = true.
+ Preconditions.checkArgument(request.getWaitForCompletion());
+ int actionSeconds = (int) Durations.toSeconds(request.getAction().getTimeout());
+ ExecutionBlockingStub stub =
+ ExecutionGrpc.newBlockingStub(channel)
+ .withCallCredentials(channelOptions.getCallCredentials())
+ .withDeadlineAfter(options.remoteTimeout + actionSeconds, TimeUnit.SECONDS);
+ Operation op = stub.execute(request);
+ Preconditions.checkState(op.getDone());
+ Preconditions.checkState(op.getResultCase() != Operation.ResultCase.RESULT_NOT_SET);
+ if (op.getResultCase() == Operation.ResultCase.ERROR) {
+ throw StatusProto.toStatusRuntimeException(op.getError());
}
- if (reply == null) {
- return ExecuteReply.newBuilder()
- .setStatus(
- ExecutionStatus.newBuilder()
- .setExecuted(false)
- .setSucceeded(false)
- .setError(ExecutionStatus.ErrorCode.UNKNOWN_ERROR)
- .setErrorDetail("Remote server terminated the connection"))
- .build();
+ try {
+ return op.getResponse().unpack(ExecuteResponse.class);
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
}
- return reply;
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/README.md b/src/main/java/com/google/devtools/build/lib/remote/README.md
index 4c85707e71..2525240b93 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/README.md
+++ b/src/main/java/com/google/devtools/build/lib/remote/README.md
@@ -13,7 +13,7 @@ When it's configured to use a remote cache, Bazel computes a hash for each actio
Starting at version 0.5.0, Bazel supports two caching protocols:
1. a HTTP-based REST protocol
-2. [a gRPC-based protocol](https://github.com/bazelbuild/bazel/blob/master/src/main/protobuf/remote_protocol.proto)
+2. [a gRPC-based protocol](https://github.com/googleapis/googleapis/blob/master/google/devtools/remoteexecution/v1test/remote_execution.proto)
## Remote caching using the HTTP REST protocol
@@ -101,7 +101,7 @@ this directory to include security control.
## Remote caching using the gRPC protocol
-We're working on a [gRPC protocol](https://github.com/bazelbuild/bazel/blob/master/src/main/protobuf/remote_protocol.proto)
+We're working on a [gRPC protocol](https://github.com/googleapis/googleapis/blob/master/google/devtools/remoteexecution/v1test/remote_execution.proto)
that supports both remote caching and remote execution. As of this writing, there is only a single server-side implementation, which is not intended for production use.
### Bazel Setup
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
index 44a63c4d40..84fdb29430 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteActionCache.java
@@ -14,15 +14,14 @@
package com.google.devtools.build.lib.remote;
-import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadCompatible;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Digest;
import java.io.IOException;
import java.util.Collection;
import javax.annotation.Nullable;
@@ -32,8 +31,8 @@ import javax.annotation.Nullable;
interface RemoteActionCache {
// CAS API
- // TODO(olaola): create a unified set of exceptions raised by the cache to encapsulate the
- // underlying CasStatus messages and gRPC errors errors.
+ // TODO(buchgr): consider removing the CacheNotFoundException, and replacing it with other
+ // ways to signal a cache miss.
/**
* Upload enough of the tree metadata and data into remote cache so that the entire tree can be
@@ -45,7 +44,7 @@ interface RemoteActionCache {
/**
* Download the entire tree data rooted by the given digest and write it into the given location.
*/
- void downloadTree(ContentDigest rootDigest, Path rootLocation)
+ void downloadTree(Digest rootDigest, Path rootLocation)
throws IOException, CacheNotFoundException;
/**
@@ -68,7 +67,7 @@ interface RemoteActionCache {
*
* @return The key for fetching the file contents blob from cache.
*/
- ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException;
+ Digest uploadFileContents(Path file) throws IOException, InterruptedException;
/**
* Put the input file contents in cache if it is not already in it. No-op if the data is already
@@ -76,22 +75,14 @@ interface RemoteActionCache {
*
* @return The key for fetching the file contents blob from cache.
*/
- ContentDigest uploadFileContents(
- ActionInput input, Path execRoot, ActionInputFileCache inputCache)
+ Digest uploadFileContents(ActionInput input, Path execRoot, ActionInputFileCache inputCache)
throws IOException, InterruptedException;
- /** Upload the given blobs to the cache, and return their digests. */
- ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs) throws InterruptedException;
-
/** Upload the given blob to the cache, and return its digests. */
- ContentDigest uploadBlob(byte[] blob) throws InterruptedException;
+ Digest uploadBlob(byte[] blob) throws InterruptedException;
/** Download and return a blob with a given digest from the cache. */
- byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException;
-
- /** Download and return blobs with given digests from the cache. */
- ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests)
- throws CacheNotFoundException;
+ byte[] downloadBlob(Digest digest) throws CacheNotFoundException;
// Execution Cache API
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
index fcdb44a14a..d779aa7073 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java
@@ -72,30 +72,6 @@ public final class RemoteOptions extends OptionsBase {
public String remoteCache;
@Option(
- name = "grpc_max_chunk_size_bytes",
- defaultValue = "16000",
- category = "remote",
- help = "The maximal number of data bytes to be sent in a single message."
- )
- public int grpcMaxChunkSizeBytes;
-
- @Option(
- name = "grpc_max_batch_inputs",
- defaultValue = "100",
- category = "remote",
- help = "The maximal number of input files to be sent in a single batch."
- )
- public int grpcMaxBatchInputs;
-
- @Option(
- name = "grpc_max_batch_size_bytes",
- defaultValue = "10485760", // 10MB
- category = "remote",
- help = "The maximal number of input bytes to be sent in a single batch."
- )
- public int grpcMaxBatchSizeBytes;
-
- @Option(
name = "remote_timeout",
defaultValue = "60",
category = "remote",
@@ -134,4 +110,12 @@ public final class RemoteOptions extends OptionsBase {
help = "Temporary, for testing only. Manually set a Platform to pass to remote execution."
)
public String experimentalRemotePlatformOverride;
+
+ @Option(
+ name = "remote_instance_name",
+ defaultValue = "",
+ category = "remote",
+ help = "Value to pass as instance_name in the remote execution API."
+ )
+ public String remoteInstanceName;
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
index bbb0166e4e..8807d0652b 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java
@@ -14,32 +14,28 @@
package com.google.devtools.build.lib.remote;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
import com.google.devtools.build.lib.actions.ActionInput;
+import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.Spawn;
-import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
+import com.google.devtools.build.lib.actions.Spawns;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.exec.SpawnResult;
import com.google.devtools.build.lib.exec.SpawnResult.Status;
import com.google.devtools.build.lib.exec.SpawnRunner;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Command;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Platform;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.remoteexecution.v1test.Action;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Command;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
+import com.google.devtools.remoteexecution.v1test.Platform;
+import com.google.protobuf.Duration;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
import io.grpc.StatusRuntimeException;
@@ -49,9 +45,7 @@ import java.util.List;
import java.util.SortedMap;
import java.util.TreeSet;
-/**
- * A client for the remote execution service.
- */
+/** A client for the remote execution service. */
@ThreadSafe
final class RemoteSpawnRunner implements SpawnRunner {
private final Path execRoot;
@@ -60,11 +54,13 @@ final class RemoteSpawnRunner implements SpawnRunner {
private final Platform platform;
private final GrpcRemoteExecutor executor;
+ private final GrpcActionCache remoteCache;
RemoteSpawnRunner(
Path execRoot,
RemoteOptions options,
- GrpcRemoteExecutor executor) {
+ GrpcRemoteExecutor executor,
+ GrpcActionCache remoteCache) {
this.execRoot = execRoot;
this.options = options;
if (options.experimentalRemotePlatformOverride != null) {
@@ -79,27 +75,12 @@ final class RemoteSpawnRunner implements SpawnRunner {
platform = null;
}
this.executor = executor;
- }
-
- RemoteSpawnRunner(
- Path execRoot,
- RemoteOptions options,
- AuthAndTLSOptions authTlsOptions) {
- this(execRoot, options, connect(options, authTlsOptions));
- }
-
- private static GrpcRemoteExecutor connect(RemoteOptions options,
- AuthAndTLSOptions authTlsOptions) {
- Preconditions.checkArgument(GrpcRemoteExecutor.isRemoteExecutionOptions(options));
- ChannelOptions channelOptions = ChannelOptions.create(authTlsOptions,
- options.grpcMaxChunkSizeBytes);
- return new GrpcRemoteExecutor(
- RemoteUtils.createChannel(options.remoteExecutor, channelOptions), channelOptions, options);
+ this.remoteCache = remoteCache;
}
@Override
public SpawnResult exec(Spawn spawn, SpawnExecutionPolicy policy)
- throws InterruptedException, IOException {
+ throws ExecException, InterruptedException, IOException {
ActionExecutionMetadata owner = spawn.getResourceOwner();
if (owner.getOwner() != null) {
policy.report(ProgressStatus.EXECUTING);
@@ -116,46 +97,38 @@ final class RemoteSpawnRunner implements SpawnRunner {
Action action =
buildAction(
spawn.getOutputFiles(),
- ContentDigests.computeDigest(command),
- repository.getMerkleDigest(inputRoot));
+ Digests.computeDigest(command),
+ repository.getMerkleDigest(inputRoot),
+ // TODO(olaola): set sensible local and remote timouts.
+ Spawns.getTimeoutSeconds(spawn, 120));
- ActionKey actionKey = ContentDigests.computeActionKey(action);
+ ActionKey actionKey = Digests.computeActionKey(action);
ActionResult result =
- this.options.remoteAcceptCached ? executor.getCachedActionResult(actionKey) : null;
+ options.remoteAcceptCached ? remoteCache.getCachedActionResult(actionKey) : null;
if (result == null) {
// Cache miss or we don't accept cache hits.
// Upload the command and all the inputs into the remote cache.
- executor.uploadBlob(command.toByteArray());
- // TODO(olaola): this should use the ActionInputFileCache for SHA1 digests!
- executor.uploadTree(repository, execRoot, inputRoot);
+ remoteCache.uploadBlob(command.toByteArray());
+ remoteCache.uploadTree(repository, execRoot, inputRoot);
// TODO(olaola): set BuildInfo and input total bytes as well.
ExecuteRequest.Builder request =
ExecuteRequest.newBuilder()
+ .setInstanceName(options.remoteInstanceName)
.setAction(action)
- .setAcceptCached(this.options.remoteAcceptCached)
+ .setWaitForCompletion(true)
.setTotalInputFileCount(inputMap.size())
- .setTimeoutMillis(policy.getTimeoutMillis());
- ExecuteReply reply = executor.executeRemotely(request.build());
- ExecutionStatus status = reply.getStatus();
-
- if (!status.getSucceeded()
- && (status.getError() != ExecutionStatus.ErrorCode.EXEC_FAILED)) {
- return new SpawnResult.Builder()
- // TODO(ulfjack): Improve the translation of the error status.
- .setStatus(Status.EXECUTION_FAILED)
- .setExitCode(-1)
- .build();
- }
-
- result = reply.getResult();
+ .setSkipCacheLookup(!options.remoteAcceptCached);
+ result = executor.executeRemotely(request.build()).getResult();
}
// TODO(ulfjack): Download stdout, stderr, and the output files in a single call.
- passRemoteOutErr(executor, result, policy.getFileOutErr());
- executor.downloadAllResults(result, execRoot);
+ passRemoteOutErr(remoteCache, result, policy.getFileOutErr());
+ if (result.getExitCode() == 0) {
+ remoteCache.downloadAllResults(result, execRoot);
+ }
return new SpawnResult.Builder()
- .setStatus(Status.SUCCESS)
- .setExitCode(result.getReturnCode())
+ .setStatus(Status.SUCCESS) // Even if the action failed with non-zero exit code.
+ .setExitCode(result.getExitCode())
.build();
} catch (StatusRuntimeException | CacheNotFoundException e) {
throw new IOException(e);
@@ -163,37 +136,58 @@ final class RemoteSpawnRunner implements SpawnRunner {
}
private Action buildAction(
- Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) {
+ Collection<? extends ActionInput> outputs,
+ Digest command,
+ Digest inputRoot,
+ long timeoutSeconds) {
Action.Builder action = Action.newBuilder();
action.setCommandDigest(command);
action.setInputRootDigest(inputRoot);
// Somewhat ugly: we rely on the stable order of outputs here for remote action caching.
for (ActionInput output : outputs) {
- action.addOutputPath(output.getExecPathString());
+ // TODO: output directories should be handled here, when they are supported.
+ action.addOutputFiles(output.getExecPathString());
}
if (platform != null) {
action.setPlatform(platform);
}
+ action.setTimeout(Duration.newBuilder().setSeconds(timeoutSeconds));
return action.build();
}
private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) {
Command.Builder command = Command.newBuilder();
- command.addAllArgv(arguments);
+ command.addAllArguments(arguments);
// Sorting the environment pairs by variable name.
TreeSet<String> variables = new TreeSet<>(environment.keySet());
for (String var : variables) {
- command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var));
+ command.addEnvironmentVariablesBuilder().setName(var).setValue(environment.get(var));
}
return command.build();
}
private static void passRemoteOutErr(
- RemoteActionCache cache, ActionResult result, FileOutErr outErr)
- throws CacheNotFoundException {
- ImmutableList<byte[]> streams =
- cache.downloadBlobs(ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest()));
- outErr.printOut(new String(streams.get(0), UTF_8));
- outErr.printErr(new String(streams.get(1), UTF_8));
+ RemoteActionCache cache, ActionResult result, FileOutErr outErr) throws IOException {
+ try {
+ if (!result.getStdoutRaw().isEmpty()) {
+ result.getStdoutRaw().writeTo(outErr.getOutputStream());
+ outErr.getOutputStream().flush();
+ } else if (result.hasStdoutDigest()) {
+ byte[] stdoutBytes = cache.downloadBlob(result.getStdoutDigest());
+ outErr.getOutputStream().write(stdoutBytes);
+ outErr.getOutputStream().flush();
+ }
+ if (!result.getStderrRaw().isEmpty()) {
+ result.getStderrRaw().writeTo(outErr.getErrorStream());
+ outErr.getErrorStream().flush();
+ } else if (result.hasStderrDigest()) {
+ byte[] stderrBytes = cache.downloadBlob(result.getStderrDigest());
+ outErr.getErrorStream().write(stderrBytes);
+ outErr.getErrorStream().flush();
+ }
+ } catch (CacheNotFoundException e) {
+ outErr.printOutLn("Failed to fetch remote stdout/err due to cache miss.");
+ outErr.getOutputStream().flush();
+ }
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
index f3e12f07ee..3f7698252e 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnStrategy.java
@@ -15,7 +15,6 @@
package com.google.devtools.build.lib.remote;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.devtools.build.lib.actions.ActionExecutionContext;
import com.google.devtools.build.lib.actions.ActionInput;
@@ -32,15 +31,7 @@ import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
import com.google.devtools.build.lib.events.Event;
import com.google.devtools.build.lib.events.EventHandler;
import com.google.devtools.build.lib.exec.SpawnInputExpander;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Command;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Platform;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.rules.fileset.FilesetActionContext;
import com.google.devtools.build.lib.standalone.StandaloneSpawnStrategy;
@@ -48,6 +39,14 @@ import com.google.devtools.build.lib.util.CommandFailureUtils;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.remoteexecution.v1test.Action;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Command;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
+import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
+import com.google.devtools.remoteexecution.v1test.Platform;
+import com.google.protobuf.Duration;
import com.google.protobuf.TextFormat;
import com.google.protobuf.TextFormat.ParseException;
import io.grpc.StatusRuntimeException;
@@ -88,12 +87,12 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
this.standaloneStrategy = new StandaloneSpawnStrategy(execRoot, verboseFailures, productName);
this.verboseFailures = verboseFailures;
this.remoteOptions = remoteOptions;
- channelOptions = ChannelOptions.create(authTlsOptions, remoteOptions.grpcMaxChunkSizeBytes);
+ channelOptions = ChannelOptions.create(authTlsOptions);
if (remoteOptions.experimentalRemotePlatformOverride != null) {
Platform.Builder platformBuilder = Platform.newBuilder();
try {
- TextFormat.getParser().merge(remoteOptions.experimentalRemotePlatformOverride,
- platformBuilder);
+ TextFormat.getParser()
+ .merge(remoteOptions.experimentalRemotePlatformOverride, platformBuilder);
} catch (ParseException e) {
throw new IllegalArgumentException(
"Failed to parse --experimental_remote_platform_override", e);
@@ -105,27 +104,32 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
}
private Action buildAction(
- Collection<? extends ActionInput> outputs, ContentDigest command, ContentDigest inputRoot) {
+ Collection<? extends ActionInput> outputs,
+ Digest command,
+ Digest inputRoot,
+ long timeoutSeconds) {
Action.Builder action = Action.newBuilder();
action.setCommandDigest(command);
action.setInputRootDigest(inputRoot);
// Somewhat ugly: we rely on the stable order of outputs here for remote action caching.
for (ActionInput output : outputs) {
- action.addOutputPath(output.getExecPathString());
+ // TODO: output directories should be handled here, when they are supported.
+ action.addOutputFiles(output.getExecPathString());
}
if (platform != null) {
action.setPlatform(platform);
}
+ action.setTimeout(Duration.newBuilder().setSeconds(timeoutSeconds));
return action.build();
}
private Command buildCommand(List<String> arguments, ImmutableMap<String, String> environment) {
Command.Builder command = Command.newBuilder();
- command.addAllArgv(arguments);
+ command.addAllArguments(arguments);
// Sorting the environment pairs by variable name.
TreeSet<String> variables = new TreeSet<>(environment.keySet());
for (String var : variables) {
- command.addEnvironmentBuilder().setVariable(var).setValue(environment.get(var));
+ command.addEnvironmentVariablesBuilder().setName(var).setValue(environment.get(var));
}
return command.build();
}
@@ -137,11 +141,11 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
private void execLocally(
Spawn spawn,
ActionExecutionContext actionExecutionContext,
- RemoteActionCache actionCache,
+ RemoteActionCache remoteCache,
ActionKey actionKey)
throws ExecException, InterruptedException {
standaloneStrategy.exec(spawn, actionExecutionContext);
- if (remoteOptions.remoteUploadLocalResults && actionCache != null && actionKey != null) {
+ if (remoteOptions.remoteUploadLocalResults && remoteCache != null && actionKey != null) {
ArrayList<Path> outputFiles = new ArrayList<>();
for (ActionInput output : spawn.getOutputFiles()) {
Path outputFile = execRoot.getRelative(output.getExecPathString());
@@ -155,17 +159,17 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
}
try {
ActionResult.Builder result = ActionResult.newBuilder();
- actionCache.uploadAllResults(execRoot, outputFiles, result);
+ remoteCache.uploadAllResults(execRoot, outputFiles, result);
FileOutErr outErr = actionExecutionContext.getFileOutErr();
if (outErr.getErrorPath().exists()) {
- ContentDigest stderr = actionCache.uploadFileContents(outErr.getErrorPath());
+ Digest stderr = remoteCache.uploadFileContents(outErr.getErrorPath());
result.setStderrDigest(stderr);
}
if (outErr.getOutputPath().exists()) {
- ContentDigest stdout = actionCache.uploadFileContents(outErr.getOutputPath());
+ Digest stdout = remoteCache.uploadFileContents(outErr.getOutputPath());
result.setStdoutDigest(stdout);
}
- actionCache.setCachedActionResult(actionKey, result.build());
+ remoteCache.setCachedActionResult(actionKey, result.build());
// Handle all cache errors here.
} catch (IOException e) {
throw new UserExecException("Unexpected IO error.", e);
@@ -188,21 +192,25 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
private static void passRemoteOutErr(
RemoteActionCache cache, ActionResult result, FileOutErr outErr) throws IOException {
try {
- ImmutableList<byte[]> streams =
- cache.downloadBlobs(ImmutableList.of(result.getStdoutDigest(), result.getStderrDigest()));
- byte[] stdout = streams.get(0);
- byte[] stderr = streams.get(1);
- // Don't round-trip through String - write to the output streams directly.
- if (stdout.length != 0) {
- outErr.getOutputStream().write(stdout);
+ if (!result.getStdoutRaw().isEmpty()) {
+ result.getStdoutRaw().writeTo(outErr.getOutputStream());
+ outErr.getOutputStream().flush();
+ } else if (result.hasStdoutDigest()) {
+ byte[] stdoutBytes = cache.downloadBlob(result.getStdoutDigest());
+ outErr.getOutputStream().write(stdoutBytes);
outErr.getOutputStream().flush();
}
- if (stderr.length != 0) {
- outErr.getErrorStream().write(stderr);
+ if (!result.getStderrRaw().isEmpty()) {
+ result.getStderrRaw().writeTo(outErr.getErrorStream());
+ outErr.getErrorStream().flush();
+ } else if (result.hasStderrDigest()) {
+ byte[] stderrBytes = cache.downloadBlob(result.getStderrDigest());
+ outErr.getErrorStream().write(stderrBytes);
outErr.getErrorStream().flush();
}
} catch (CacheNotFoundException e) {
- // Ignoring.
+ outErr.printOutLn("Failed to fetch remote stdout/err due to cache miss.");
+ outErr.getOutputStream().flush();
}
}
@@ -220,19 +228,23 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
Executor executor = actionExecutionContext.getExecutor();
EventHandler eventHandler = executor.getEventHandler();
- RemoteActionCache actionCache = null;
+ RemoteActionCache remoteCache = null;
GrpcRemoteExecutor workExecutor = null;
if (spawn.isRemotable()) {
// Initialize remote cache and execution handlers. We use separate handlers for every
// action to enable server-side parallelism (need a different gRPC channel per action).
if (SimpleBlobStoreFactory.isRemoteCacheOptions(remoteOptions)) {
- actionCache = new SimpleBlobStoreActionCache(SimpleBlobStoreFactory.create(remoteOptions));
+ remoteCache = new SimpleBlobStoreActionCache(SimpleBlobStoreFactory.create(remoteOptions));
} else if (GrpcActionCache.isRemoteCacheOptions(remoteOptions)) {
- actionCache = new GrpcActionCache(remoteOptions, channelOptions);
+ remoteCache =
+ new GrpcActionCache(
+ RemoteUtils.createChannel(remoteOptions.remoteCache, channelOptions),
+ channelOptions,
+ remoteOptions);
}
- // Otherwise actionCache remains null and remote caching/execution are disabled.
+ // Otherwise remoteCache remains null and remote caching/execution are disabled.
- if (actionCache != null && GrpcRemoteExecutor.isRemoteExecutionOptions(remoteOptions)) {
+ if (remoteCache != null && GrpcRemoteExecutor.isRemoteExecutionOptions(remoteOptions)) {
workExecutor =
new GrpcRemoteExecutor(
RemoteUtils.createChannel(remoteOptions.remoteExecutor, channelOptions),
@@ -240,15 +252,16 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
remoteOptions);
}
}
- if (!spawn.isRemotable() || actionCache == null) {
+ if (!spawn.isRemotable() || remoteCache == null) {
standaloneStrategy.exec(spawn, actionExecutionContext);
return;
}
if (executor.reportsSubcommands()) {
executor.reportSubcommand(spawn);
}
- executor.getEventBus().post(
- ActionStatusMessage.runningStrategy(spawn.getResourceOwner(), "remote"));
+ executor
+ .getEventBus()
+ .post(ActionStatusMessage.runningStrategy(spawn.getResourceOwner(), "remote"));
try {
// Temporary hack: the TreeNodeRepository should be created and maintained upstream!
@@ -266,22 +279,25 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
Action action =
buildAction(
spawn.getOutputFiles(),
- ContentDigests.computeDigest(command),
- repository.getMerkleDigest(inputRoot));
+ Digests.computeDigest(command),
+ repository.getMerkleDigest(inputRoot),
+ // TODO(olaola): set sensible local and remote timouts.
+ Spawns.getTimeoutSeconds(spawn, 120));
// Look up action cache, and reuse the action output if it is found.
- actionKey = ContentDigests.computeActionKey(action);
- ActionResult result = this.remoteOptions.remoteAcceptCached
- ? actionCache.getCachedActionResult(actionKey)
- : null;
+ actionKey = Digests.computeActionKey(action);
+ ActionResult result =
+ this.remoteOptions.remoteAcceptCached
+ ? remoteCache.getCachedActionResult(actionKey)
+ : null;
boolean acceptCachedResult = this.remoteOptions.remoteAcceptCached;
if (result != null) {
// We don't cache failed actions, so we know the outputs exist.
// For now, download all outputs locally; in the future, we can reuse the digests to
// just update the TreeNodeRepository and continue the build.
try {
- actionCache.downloadAllResults(result, execRoot);
- passRemoteOutErr(actionCache, result, actionExecutionContext.getFileOutErr());
+ remoteCache.downloadAllResults(result, execRoot);
+ passRemoteOutErr(remoteCache, result, actionExecutionContext.getFileOutErr());
return;
} catch (CacheNotFoundException e) {
acceptCachedResult = false; // Retry the action remotely and invalidate the results.
@@ -289,47 +305,39 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
}
if (workExecutor == null) {
- execLocally(spawn, actionExecutionContext, actionCache, actionKey);
+ execLocally(spawn, actionExecutionContext, remoteCache, actionKey);
return;
}
// Upload the command and all the inputs into the remote cache.
- actionCache.uploadBlob(command.toByteArray());
- // TODO(olaola): this should use the ActionInputFileCache for SHA1 digests!
- actionCache.uploadTree(repository, execRoot, inputRoot);
+ remoteCache.uploadBlob(command.toByteArray());
+ remoteCache.uploadTree(repository, execRoot, inputRoot);
// TODO(olaola): set BuildInfo and input total bytes as well.
ExecuteRequest.Builder request =
ExecuteRequest.newBuilder()
+ .setInstanceName(remoteOptions.remoteInstanceName)
.setAction(action)
- .setAcceptCached(acceptCachedResult)
+ .setWaitForCompletion(true)
.setTotalInputFileCount(inputMap.size())
- .setTimeoutMillis(1000 * Spawns.getTimeoutSeconds(spawn, 120));
- // TODO(olaola): set sensible local and remote timouts.
- ExecuteReply reply = workExecutor.executeRemotely(request.build());
- ExecutionStatus status = reply.getStatus();
+ .setSkipCacheLookup(!acceptCachedResult);
+ ExecuteResponse reply = workExecutor.executeRemotely(request.build());
result = reply.getResult();
- // We do not want to pass on the remote stdout and strerr if we are going to retry the
- // action.
- if (status.getSucceeded()) {
- passRemoteOutErr(actionCache, result, actionExecutionContext.getFileOutErr());
- actionCache.downloadAllResults(result, execRoot);
- if (result.getReturnCode() != 0) {
- String cwd = executor.getExecRoot().getPathString();
- String message =
- CommandFailureUtils.describeCommandFailure(
- verboseFailures, spawn.getArguments(), spawn.getEnvironment(), cwd);
- throw new UserExecException(message + ": Exit " + result.getReturnCode());
- }
+ if (remoteOptions.remoteLocalFallback && result.getExitCode() != 0) {
+ execLocally(spawn, actionExecutionContext, remoteCache, actionKey);
return;
}
- if (status.getError() == ExecutionStatus.ErrorCode.EXEC_FAILED
- || !remoteOptions.remoteLocalFallback) {
- passRemoteOutErr(actionCache, result, actionExecutionContext.getFileOutErr());
- throw new UserExecException(status.getErrorDetail());
+ passRemoteOutErr(remoteCache, result, actionExecutionContext.getFileOutErr());
+ if (result.getExitCode() == 0) {
+ remoteCache.downloadAllResults(result, execRoot);
+ } else {
+ String cwd = executor.getExecRoot().getPathString();
+ String message =
+ CommandFailureUtils.describeCommandFailure(
+ verboseFailures, spawn.getArguments(), spawn.getEnvironment(), cwd);
+ throw new UserExecException(message + ": Exit " + result.getExitCode());
}
// For now, we retry locally on all other remote errors.
// TODO(olaola): add remote retries on cache miss errors.
- execLocally(spawn, actionExecutionContext, actionCache, actionKey);
} catch (IOException e) {
throw new UserExecException("Unexpected IO error.", e);
} catch (InterruptedException e) {
@@ -343,14 +351,15 @@ final class RemoteSpawnStrategy implements SpawnActionContext {
}
eventHandler.handle(Event.warn(mnemonic + " remote work failed (" + e + ")" + stackTrace));
if (remoteOptions.remoteLocalFallback) {
- execLocally(spawn, actionExecutionContext, actionCache, actionKey);
+ execLocally(spawn, actionExecutionContext, remoteCache, actionKey);
} else {
throw new UserExecException(e);
}
} catch (CacheNotFoundException e) {
+ // TODO(olaola): handle this exception by reuploading / reexecuting the action remotely.
eventHandler.handle(Event.warn(mnemonic + " remote work results cache miss (" + e + ")"));
if (remoteOptions.remoteLocalFallback) {
- execLocally(spawn, actionExecutionContext, actionCache, actionKey);
+ execLocally(spawn, actionExecutionContext, remoteCache, actionKey);
} else {
throw new UserExecException(e);
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
index 845f5411fe..58b2c94bf8 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteUtils.java
@@ -26,8 +26,7 @@ public final class RemoteUtils {
NettyChannelBuilder builder =
NettyChannelBuilder.forTarget(target)
.negotiationType(
- channelOptions.tlsEnabled() ? NegotiationType.TLS : NegotiationType.PLAINTEXT)
- .maxMessageSize(channelOptions.maxMessageSize());
+ channelOptions.tlsEnabled() ? NegotiationType.TLS : NegotiationType.PLAINTEXT);
if (channelOptions.getSslContext() != null) {
builder.sslContext(channelOptions.getSslContext());
if (channelOptions.getTlsAuthorityOverride() != null) {
diff --git a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
index bb1ca512d5..75d208a687 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/SimpleBlobStoreActionCache.java
@@ -14,28 +14,27 @@
package com.google.devtools.build.lib.remote;
-import com.google.common.collect.ImmutableList;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileMetadata;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Output;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Output.ContentCase;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.Directory;
+import com.google.devtools.remoteexecution.v1test.DirectoryNode;
+import com.google.devtools.remoteexecution.v1test.FileNode;
+import com.google.devtools.remoteexecution.v1test.OutputDirectory;
+import com.google.devtools.remoteexecution.v1test.OutputFile;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.Semaphore;
@@ -59,8 +58,8 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
public void uploadTree(TreeNodeRepository repository, Path execRoot, TreeNode root)
throws IOException, InterruptedException {
repository.computeMerkleDigests(root);
- for (FileNode fileNode : repository.treeToFileNodes(root)) {
- uploadBlob(fileNode.toByteArray());
+ for (Directory directory : repository.treeToDirectories(root)) {
+ uploadBlob(directory.toByteArray());
}
// TODO(ulfjack): Only upload files that aren't in the CAS yet?
for (TreeNode leaf : repository.leaves(root)) {
@@ -69,26 +68,26 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
}
@Override
- public void downloadTree(ContentDigest rootDigest, Path rootLocation)
+ public void downloadTree(Digest rootDigest, Path rootLocation)
throws IOException, CacheNotFoundException {
- FileNode fileNode = FileNode.parseFrom(downloadBlob(rootDigest));
- if (fileNode.hasFileMetadata()) {
- FileMetadata meta = fileNode.getFileMetadata();
- downloadFileContents(meta.getDigest(), rootLocation, meta.getExecutable());
+ Directory directory = Directory.parseFrom(downloadBlob(rootDigest));
+ for (FileNode file : directory.getFilesList()) {
+ downloadFileContents(
+ file.getDigest(), rootLocation.getRelative(file.getName()), file.getIsExecutable());
}
- for (FileNode.Child child : fileNode.getChildList()) {
- downloadTree(child.getDigest(), rootLocation.getRelative(child.getPath()));
+ for (DirectoryNode child : directory.getDirectoriesList()) {
+ downloadTree(child.getDigest(), rootLocation.getRelative(child.getName()));
}
}
@Override
- public ContentDigest uploadFileContents(Path file) throws IOException, InterruptedException {
+ public Digest uploadFileContents(Path file) throws IOException, InterruptedException {
// This unconditionally reads the whole file into memory first!
return uploadBlob(ByteString.readFrom(file.getInputStream()).toByteArray());
}
@Override
- public ContentDigest uploadFileContents(
+ public Digest uploadFileContents(
ActionInput input, Path execRoot, ActionInputFileCache inputCache)
throws IOException, InterruptedException {
// This unconditionally reads the whole file into memory first!
@@ -96,26 +95,31 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
((VirtualActionInput) input).writeTo(buffer);
byte[] blob = buffer.toByteArray();
- return uploadBlob(blob, ContentDigests.computeDigest(blob));
+ return uploadBlob(blob, Digests.computeDigest(blob));
}
return uploadBlob(
ByteString.readFrom(execRoot.getRelative(input.getExecPathString()).getInputStream())
.toByteArray(),
- ContentDigests.getDigestFromInputCache(input, inputCache));
+ Digests.getDigestFromInputCache(input, inputCache));
}
@Override
public void downloadAllResults(ActionResult result, Path execRoot)
throws IOException, CacheNotFoundException {
- for (Output output : result.getOutputList()) {
- if (output.getContentCase() == ContentCase.FILE_METADATA) {
- FileMetadata m = output.getFileMetadata();
- downloadFileContents(
- m.getDigest(), execRoot.getRelative(output.getPath()), m.getExecutable());
+ for (OutputFile file : result.getOutputFilesList()) {
+ if (!file.getContent().isEmpty()) {
+ createFile(
+ file.getContent().toByteArray(),
+ execRoot.getRelative(file.getPath()),
+ file.getIsExecutable());
} else {
- downloadTree(output.getDigest(), execRoot.getRelative(output.getPath()));
+ downloadFileContents(
+ file.getDigest(), execRoot.getRelative(file.getPath()), file.getIsExecutable());
}
}
+ for (OutputDirectory directory : result.getOutputDirectoriesList()) {
+ downloadTree(directory.getDigest(), execRoot.getRelative(directory.getPath()));
+ }
}
@Override
@@ -130,22 +134,25 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
// TreeNodeRepository to call uploadTree.
throw new UnsupportedOperationException("Storing a directory is not yet supported.");
}
+ // TODO(olaola): inline small file contents here.
// First put the file content to cache.
- ContentDigest digest = uploadFileContents(file);
+ Digest digest = uploadFileContents(file);
// Add to protobuf.
result
- .addOutputBuilder()
+ .addOutputFilesBuilder()
.setPath(file.relativeTo(execRoot).getPathString())
- .getFileMetadataBuilder()
.setDigest(digest)
- .setExecutable(file.isExecutable());
+ .setIsExecutable(file.isExecutable());
}
}
- private void downloadFileContents(ContentDigest digest, Path dest, boolean executable)
+ private void downloadFileContents(Digest digest, Path dest, boolean executable)
throws IOException, CacheNotFoundException {
// This unconditionally downloads the whole file into memory first!
- byte[] contents = downloadBlob(digest);
+ createFile(downloadBlob(digest), dest, executable);
+ }
+
+ private void createFile(byte[] contents, Path dest, boolean executable) throws IOException {
FileSystemUtils.createDirectoryAndParents(dest.getParentDirectory());
try (OutputStream stream = dest.getOutputStream()) {
stream.write(contents);
@@ -153,16 +160,6 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
dest.setExecutable(executable);
}
- @Override
- public ImmutableList<ContentDigest> uploadBlobs(Iterable<byte[]> blobs)
- throws InterruptedException {
- ArrayList<ContentDigest> digests = new ArrayList<>();
- for (byte[] blob : blobs) {
- digests.add(uploadBlob(blob));
- }
- return ImmutableList.copyOf(digests);
- }
-
private void checkBlobSize(long blobSizeKBytes, String type) {
Preconditions.checkArgument(
blobSizeKBytes < MAX_MEMORY_KBYTES,
@@ -172,16 +169,16 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
}
@Override
- public ContentDigest uploadBlob(byte[] blob) throws InterruptedException {
- return uploadBlob(blob, ContentDigests.computeDigest(blob));
+ public Digest uploadBlob(byte[] blob) throws InterruptedException {
+ return uploadBlob(blob, Digests.computeDigest(blob));
}
- private ContentDigest uploadBlob(byte[] blob, ContentDigest digest) throws InterruptedException {
+ private Digest uploadBlob(byte[] blob, Digest digest) throws InterruptedException {
int blobSizeKBytes = blob.length / 1024;
checkBlobSize(blobSizeKBytes, "Upload");
uploadMemoryAvailable.acquire(blobSizeKBytes);
try {
- blobStore.put(ContentDigests.toHexString(digest), blob);
+ blobStore.put(digest.getHash(), blob);
} finally {
uploadMemoryAvailable.release(blobSizeKBytes);
}
@@ -189,36 +186,26 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
}
@Override
- public byte[] downloadBlob(ContentDigest digest) throws CacheNotFoundException {
+ public byte[] downloadBlob(Digest digest) throws CacheNotFoundException {
if (digest.getSizeBytes() == 0) {
return new byte[0];
}
// This unconditionally downloads the whole blob into memory!
checkBlobSize(digest.getSizeBytes() / 1024, "Download");
- byte[] data = blobStore.get(ContentDigests.toHexString(digest));
+ byte[] data = blobStore.get(digest.getHash());
if (data == null) {
throw new CacheNotFoundException(digest);
}
return data;
}
- @Override
- public ImmutableList<byte[]> downloadBlobs(Iterable<ContentDigest> digests)
- throws CacheNotFoundException {
- ArrayList<byte[]> blobs = new ArrayList<>();
- for (ContentDigest c : digests) {
- blobs.add(downloadBlob(c));
- }
- return ImmutableList.copyOf(blobs);
- }
-
- public boolean containsKey(ContentDigest digest) {
- return blobStore.containsKey(ContentDigests.toHexString(digest));
+ public boolean containsKey(Digest digest) {
+ return blobStore.containsKey(digest.getHash());
}
@Override
public ActionResult getCachedActionResult(ActionKey actionKey) {
- byte[] data = blobStore.get(ContentDigests.toHexString(actionKey.getDigest()));
+ byte[] data = blobStore.get(actionKey.getDigest().getHash());
if (data == null) {
return null;
}
@@ -232,6 +219,6 @@ public final class SimpleBlobStoreActionCache implements RemoteActionCache {
@Override
public void setCachedActionResult(ActionKey actionKey, ActionResult result)
throws InterruptedException {
- blobStore.put(ContentDigests.toHexString(actionKey.getDigest()), result.toByteArray());
+ blobStore.put(actionKey.getDigest().getHash(), result.toByteArray());
}
}
diff --git a/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java b/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java
index 8272295a84..dba3a14c12 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java
+++ b/src/main/java/com/google/devtools/build/lib/remote/TreeNodeRepository.java
@@ -29,11 +29,11 @@ import com.google.devtools.build.lib.concurrent.BlazeInterners;
import com.google.devtools.build.lib.concurrent.ThreadSafety.Immutable;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.exec.SpawnInputExpander;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.Directory;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.ArrayList;
@@ -185,11 +185,11 @@ public final class TreeNodeRepository extends TreeTraverser<TreeNodeRepository.T
// be part of the state.
private final Path execRoot;
private final ActionInputFileCache inputFileCache;
- private final Map<TreeNode, ContentDigest> treeNodeDigestCache = new HashMap<>();
- private final Map<ContentDigest, TreeNode> digestTreeNodeCache = new HashMap<>();
- private final Map<TreeNode, FileNode> fileNodeCache = new HashMap<>();
- private final Map<VirtualActionInput, ContentDigest> virtualInputDigestCache = new HashMap<>();
- private final Map<ContentDigest, VirtualActionInput> digestVirtualInputCache = new HashMap<>();
+ private final Map<TreeNode, Digest> treeNodeDigestCache = new HashMap<>();
+ private final Map<Digest, TreeNode> digestTreeNodeCache = new HashMap<>();
+ private final Map<TreeNode, Directory> directoryCache = new HashMap<>();
+ private final Map<VirtualActionInput, Digest> virtualInputDigestCache = new HashMap<>();
+ private final Map<Digest, VirtualActionInput> digestVirtualInputCache = new HashMap<>();
public TreeNodeRepository(Path execRoot, ActionInputFileCache inputFileCache) {
this.execRoot = execRoot;
@@ -298,125 +298,125 @@ public final class TreeNodeRepository extends TreeTraverser<TreeNodeRepository.T
return interner.intern(new TreeNode(entries));
}
- private synchronized FileNode getOrComputeFileNode(TreeNode node) throws IOException {
+ private synchronized Directory getOrComputeDirectory(TreeNode node) throws IOException {
// Assumes all child digests have already been computed!
- FileNode fileNode = fileNodeCache.get(node);
- if (fileNode == null) {
- FileNode.Builder b = FileNode.newBuilder();
- if (node.isLeaf()) {
- ActionInput input = node.getActionInput();
- if (input instanceof VirtualActionInput) {
- VirtualActionInput virtualInput = (VirtualActionInput) input;
- ContentDigest digest = ContentDigests.computeDigest(virtualInput);
- virtualInputDigestCache.put(virtualInput, digest);
- // There may be multiple inputs with the same digest. In that case, we don't care which
- // one we get back from the digestVirtualInputCache later.
- digestVirtualInputCache.put(digest, virtualInput);
- b.getFileMetadataBuilder()
- .setDigest(digest)
- // We always declare virtual action inputs as non-executable for now.
- .setExecutable(false);
+ Preconditions.checkArgument(!node.isLeaf());
+ Directory directory = directoryCache.get(node);
+ if (directory == null) {
+ Directory.Builder b = Directory.newBuilder();
+ for (TreeNode.ChildEntry entry : node.getChildEntries()) {
+ TreeNode child = entry.getChild();
+ if (child.isLeaf()) {
+ ActionInput input = child.getActionInput();
+ if (input instanceof VirtualActionInput) {
+ VirtualActionInput virtualInput = (VirtualActionInput) input;
+ Digest digest = Digests.computeDigest(virtualInput);
+ virtualInputDigestCache.put(virtualInput, digest);
+ // There may be multiple inputs with the same digest. In that case, we don't care which
+ // one we get back from the digestVirtualInputCache later.
+ digestVirtualInputCache.put(digest, virtualInput);
+ b.addFilesBuilder()
+ .setName(entry.getSegment())
+ .setDigest(digest)
+ .setIsExecutable(false);
+ } else {
+ b.addFilesBuilder()
+ .setName(entry.getSegment())
+ .setDigest(Digests.getDigestFromInputCache(input, inputFileCache))
+ .setIsExecutable(execRoot.getRelative(input.getExecPathString()).isExecutable());
+ }
} else {
- b.getFileMetadataBuilder()
- .setDigest(ContentDigests.getDigestFromInputCache(input, inputFileCache))
- .setExecutable(execRoot.getRelative(input.getExecPathString()).isExecutable());
- }
- } else {
- for (TreeNode.ChildEntry entry : node.getChildEntries()) {
- ContentDigest childDigest = treeNodeDigestCache.get(entry.getChild());
- Preconditions.checkState(childDigest != null);
- b.addChildBuilder().setPath(entry.getSegment()).setDigest(childDigest);
+ Digest childDigest = Preconditions.checkNotNull(treeNodeDigestCache.get(child));
+ b.addDirectoriesBuilder().setName(entry.getSegment()).setDigest(childDigest);
}
}
- fileNode = b.build();
- fileNodeCache.put(node, fileNode);
- ContentDigest digest = ContentDigests.computeDigest(fileNode);
+ directory = b.build();
+ directoryCache.put(node, directory);
+ Digest digest = Digests.computeDigest(directory);
treeNodeDigestCache.put(node, digest);
digestTreeNodeCache.put(digest, node);
}
- return fileNode;
+ return directory;
}
// Recursively traverses the tree, expanding and computing Merkle digests for nodes for which
// they have not yet been computed and cached.
public void computeMerkleDigests(TreeNode root) throws IOException {
synchronized (this) {
- if (fileNodeCache.get(root) != null) {
+ if (directoryCache.get(root) != null) {
// Strong assumption: the cache is valid, i.e. parent present implies children present.
return;
}
}
- if (root.isLeaf()) {
- ActionInput input = root.getActionInput();
- if (!(input instanceof VirtualActionInput)) {
- inputFileCache.getDigest(input);
- }
- } else {
+ if (!root.isLeaf()) {
for (TreeNode child : children(root)) {
computeMerkleDigests(child);
}
+ getOrComputeDirectory(root);
}
- getOrComputeFileNode(root);
}
/**
* Should only be used after computeMerkleDigests has been called on one of the node ancestors.
* Returns the precomputed digest.
*/
- public ContentDigest getMerkleDigest(TreeNode node) {
- return treeNodeDigestCache.get(node);
+ public Digest getMerkleDigest(TreeNode node) throws IOException {
+ return node.isLeaf()
+ ? actionInputToDigest(node.getActionInput())
+ : treeNodeDigestCache.get(node);
}
/**
* Returns the precomputed digests for both data and metadata. Should only be used after
* computeMerkleDigests has been called on one of the node ancestors.
*/
- public ImmutableCollection<ContentDigest> getAllDigests(TreeNode root) throws IOException {
- ImmutableSet.Builder<ContentDigest> digests = ImmutableSet.builder();
+ public ImmutableCollection<Digest> getAllDigests(TreeNode root) throws IOException {
+ ImmutableSet.Builder<Digest> digests = ImmutableSet.builder();
for (TreeNode node : descendants(root)) {
- digests.add(Preconditions.checkNotNull(treeNodeDigestCache.get(node)));
- if (node.isLeaf()) {
- digests.add(actionInputToDigest(node.getActionInput()));
- }
+ digests.add(
+ node.isLeaf()
+ ? actionInputToDigest(node.getActionInput())
+ : Preconditions.checkNotNull(treeNodeDigestCache.get(node)));
}
return digests.build();
}
- private ContentDigest actionInputToDigest(ActionInput input) throws IOException {
+ private Digest actionInputToDigest(ActionInput input) throws IOException {
if (input instanceof VirtualActionInput) {
return Preconditions.checkNotNull(virtualInputDigestCache.get(input));
}
- return ContentDigests.getDigestFromInputCache(input, inputFileCache);
+ return Digests.getDigestFromInputCache(input, inputFileCache);
}
/**
- * Serializes all of the subtree to the file node list. TODO(olaola): add a version that only
- * copies a part of the tree that we are interested in. Should only be used after
- * computeMerkleDigests has been called on one of the node ancestors.
+ * Serializes all of the subtree to a Directory list. TODO(olaola): add a version that only copies
+ * a part of the tree that we are interested in. Should only be used after computeMerkleDigests
+ * has been called on one of the node ancestors.
*/
// Note: this is not, strictly speaking, thread safe. If someone is deleting cached Merkle hashes
// while this is executing, it will trigger an exception. But I think this is WAI.
- public ImmutableList<FileNode> treeToFileNodes(TreeNode root) {
- ImmutableList.Builder<FileNode> fileNodes = ImmutableList.builder();
+ public ImmutableList<Directory> treeToDirectories(TreeNode root) {
+ ImmutableList.Builder<Directory> directories = ImmutableList.builder();
for (TreeNode node : descendants(root)) {
- fileNodes.add(Preconditions.checkNotNull(fileNodeCache.get(node)));
+ if (!node.isLeaf()) {
+ directories.add(Preconditions.checkNotNull(directoryCache.get(node)));
+ }
}
- return fileNodes.build();
+ return directories.build();
}
/**
* Should only be used on digests created by a call to computeMerkleDigests. Looks up ActionInputs
- * or FileNodes by cached digests and adds them to the lists.
+ * or Directory messages by cached digests and adds them to the lists.
*/
public void getDataFromDigests(
- Iterable<ContentDigest> digests, List<ActionInput> actionInputs, List<FileNode> nodes) {
- for (ContentDigest digest : digests) {
+ Iterable<Digest> digests, List<ActionInput> actionInputs, List<Directory> nodes) {
+ for (Digest digest : digests) {
TreeNode treeNode = digestTreeNodeCache.get(digest);
if (treeNode != null) {
- nodes.add(Preconditions.checkNotNull(fileNodeCache.get(treeNode)));
- } else {
- // If not there, it must be an ActionInput.
- ByteString hexDigest = ByteString.copyFromUtf8(ContentDigests.toHexString(digest));
+ nodes.add(Preconditions.checkNotNull(directoryCache.get(treeNode)));
+ } else { // If not there, it must be an ActionInput.
+ ByteString hexDigest = ByteString.copyFromUtf8(digest.getHash());
ActionInput input = inputFileCache.getInputFromDigest(hexDigest);
if (input == null) {
// ... or a VirtualActionInput.
diff --git a/src/main/protobuf/BUILD b/src/main/protobuf/BUILD
index 96a9394a04..d08393d17e 100644
--- a/src/main/protobuf/BUILD
+++ b/src/main/protobuf/BUILD
@@ -22,7 +22,6 @@ FILES = [
"android_deploy_info",
"apk_manifest",
"command_server",
- "remote_protocol",
]
[proto_library(
@@ -59,13 +58,6 @@ cc_grpc_library(
src = "command_server.proto",
)
-# TODO(olaola): add Golang support.
-java_grpc_library(
- name = "remote_protocol_java_grpc",
- srcs = [":remote_protocol_proto"],
- deps = [":remote_protocol_java_proto"],
-)
-
py_proto_library(
name = "build_pb_py",
srcs = ["build.proto"],
@@ -82,6 +74,5 @@ filegroup(
name = "dist_jars",
srcs = [s + "_java_proto_srcs" for s in FILES] + [
":command_server_java_grpc_srcs",
- ":remote_protocol_java_grpc_srcs",
],
)
diff --git a/src/main/protobuf/remote_protocol.proto b/src/main/protobuf/remote_protocol.proto
deleted file mode 100644
index 2a68643d1e..0000000000
--- a/src/main/protobuf/remote_protocol.proto
+++ /dev/null
@@ -1,329 +0,0 @@
-// Copyright 2015 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto3";
-
-package google.devtools.remote_execution;
-
-option java_package = "com.google.devtools.build.lib.remote";
-
-message ContentDigest {
- bytes digest = 1; // A hash of the contents of the file, blob, or tree node
- // (see below). The contents are digested using SHA1 (20 bytes).
- int64 size_bytes = 2;
- int32 version = 3; // Will be 0 by default, and increase if we ever change
- // the hash function used, etc.
- // And maybe other metadata later.
-}
-
-message FileMetadata {
- ContentDigest digest = 1;
- bool executable = 3;
-}
-
-// A Platform is defined by a set of opaque name-value pairs.
-message Platform {
- message Property {
- string name = 1;
- string value = 2;
- }
- // Entries are sorted by name,value to ensure a canonical form.
- repeated Property entry = 1;
-}
-
-// All the fields of this message have the following in common: they are
-// expected to define a result. All instructions to remote execution that do
-// not affect the result should be out of this message, because this message
-// will be used as the Execution Cache key.
-message Action {
- // The digest of the command (as an encoded proto blob).
- // Digested separately, because it is both long and reusable.
- ContentDigest command_digest = 1;
- // The Merkle digest of the root input directory.
- ContentDigest input_root_digest = 2;
- // Relative paths to the action's output files and directories.
- // The standard output and error stream contents are always returned
- // in addition (see ActionResult message).
- // Any unnamed output files will be discarded.
- // A path will be interpreted as a directory iff it contains a trailing
- // slash.
- // This should be sorted!
- repeated string output_path = 4;
- Platform platform = 5;
- // All other requirements that affect the result of the action should be
- // here as well.
-}
-
-message Command {
- // The argument vector. argv[0] is the binary.
- repeated string argv = 1;
- // A map of environment variables.
- // This is part of the command rather than Platform, because it does not
- // affect scheduling.
- message EnvironmentEntry {
- string variable = 1;
- string value = 2;
- }
- // Environment variables, sorted by variable.
- repeated EnvironmentEntry environment = 2;
- // Possibly other fields the describe the setup actions of the command.
-}
-
-message Output {
- string path = 1;
- // Yes, we shouldn't have to repeat paths here, but it should not be too
- // costly, and might help.
- // The actual output bodies will be stored in CAS.
- oneof content {
- // Populated for directory outputs only, contains the root FileNode digest.
- ContentDigest digest = 2;
- // Populated for file outputs only.
- FileMetadata file_metadata = 3;
- }
-}
-
-message ActionResult {
- // Relative paths to the action's output files and directories.
- // Any unnamed output files will be discarded.
- // A path will be interpreted as a directory iff it contains a trailing
- // slash.
- // This should be sorted by path.
- repeated Output output = 1;
- int32 return_code = 2;
- ContentDigest stdout_digest = 3;
- ContentDigest stderr_digest = 4;
-}
-
-// Status message shared by all CAS requests.
-message CasStatus {
- bool succeeded = 1; // Whether the requested action had server side errors
- // or not.
- enum ErrorCode {
- UNKNOWN = 0;
- INVALID_ARGUMENT = 1; // The client behaved incorrectly. error_detail should
- // have more information.
- MISSING_DIGEST = 2; // Missing a node on tree download.
- DIGEST_MISMATCH = 3; // Upload only error, when requested digest does not
- // match the server side computed one.
- NODE_PARSE_ERROR = 4; // Failed to parse digested data as node.
- // more errors...
- }
- ErrorCode error = 2;
- string error_detail = 3; // Human readable error.
- // These are a common part of the status for many CAS requests:
- repeated ContentDigest missing_digest = 4;
- repeated ContentDigest parse_failed_digest = 5; // Only relevant to trees.
-}
-
-service CasService {
- // Looks up given content keys in CAS, and returns success when found.
- // The single returned status will have the potentially missing digests,
- // which need to be re-uploaded.
- rpc Lookup(CasLookupRequest) returns (CasLookupReply) { }
- // Uploads a directory tree into CAS. Not streamed, because it is only tree
- // metadata.
- rpc UploadTreeMetadata(CasUploadTreeMetadataRequest) returns
- (CasUploadTreeMetadataReply) { }
- // Uploads data blob(s) into CAS.
- rpc UploadBlob(stream CasUploadBlobRequest) returns (CasUploadBlobReply) { }
- // Downoads a directory tree metadata from CAS.
- rpc DownloadTreeMetadata(CasDownloadTreeMetadataRequest) returns
- (CasDownloadTreeMetadataReply) { }
- // Downoads a directory tree from CAS. Returns the entire root directory.
- rpc DownloadTree(CasDownloadTreeRequest) returns (stream CasDownloadReply) { }
- // Downoads data blob(s) from CAS, returns them.
- rpc DownloadBlob(CasDownloadBlobRequest) returns (stream CasDownloadReply) { }
-}
-
-message FileNode {
- FileMetadata file_metadata = 1;
- message Child {
- string path = 1;
- ContentDigest digest = 2;
- }
- // The children should be sorted by path, and not have equal subdirectory
- // prefixes.
- repeated Child child = 2;
-}
-
-message CasLookupRequest {
- repeated ContentDigest digest = 1;
-}
-
-message CasLookupReply {
- CasStatus status = 1;
-}
-
-message CasUploadTreeMetadataRequest {
- repeated FileNode tree_node = 1;
-}
-
-message CasUploadTreeMetadataReply {
- CasStatus status = 1;
-}
-
-message CasDownloadTreeMetadataRequest {
- ContentDigest root = 1;
-}
-
-message CasDownloadTreeMetadataReply {
- CasStatus status = 1;
- repeated FileNode tree_node = 2;
-}
-
-message BlobChunk {
- ContentDigest digest = 1; // Present only in first chunk.
- int64 offset = 2;
- bytes data = 3;
-}
-
-// This will be used for batching files/blobs.
-message CasUploadBlobRequest {
- BlobChunk data = 1;
-}
-
-message CasUploadBlobReply {
- CasStatus status = 1;
-}
-
-message CasDownloadTreeRequest {
- ContentDigest root_digest = 1;
-}
-
-// This message is streamed.
-message CasDownloadReply {
- CasStatus status = 1;
- BlobChunk data = 2;
- // For trees, data is the entire root directory, zipped (for a single root).
- // For blobs, sequential chunks of multiple blobs.
-}
-
-message CasDownloadBlobRequest {
- repeated ContentDigest digest = 1;
-}
-
-service ExecutionCacheService {
- // Gets results of a cached action.
- rpc GetCachedResult(ExecutionCacheRequest) returns (ExecutionCacheReply) { }
- // Set results of a cached action. This requires reproducible builds on
- // connected machines!
- rpc SetCachedResult(ExecutionCacheSetRequest) returns
- (ExecutionCacheSetReply) {}
-}
-
-message ExecutionCacheRequest {
- ContentDigest action_digest = 1;
-}
-
-message ExecutionCacheStatus {
- // Whether the request had any server side errors. For a lookup (get result)
- // request, a true value means the result was found in the cache.
- bool succeeded = 1;
- enum ErrorCode {
- UNKNOWN = 0;
- MISSING_RESULT = 1; // The result was not found in the execution cache.
- UNSUPPORTED = 2; // The request is not supported by the server.
- // More server errors...
- }
- ErrorCode error = 2;
- string error_detail = 3; // Human readable error.
-}
-
-message ExecutionCacheReply {
- ExecutionCacheStatus status = 1;
- ActionResult result = 2;
-}
-
-message ExecutionCacheSetRequest {
- ContentDigest action_digest = 1;
- ActionResult result = 2;
-}
-
-message ExecutionCacheSetReply {
- ExecutionCacheStatus status = 1;
-}
-
-service ExecuteService {
- // Executes an action remotely.
- rpc Execute(ExecuteRequest) returns (stream ExecuteReply) { }
-}
-
-message ExecuteRequest {
- Action action = 1;
- bool accept_cached = 2;
- // Later will probably add previous attempt history, as it will be
- // useful for monitoring and probably scheduling as well.
- // These fields will be useful for scheduling, error reporting (e.g. disk
- // exceeded) and for log analysis.
- int32 total_input_file_count = 3;
- int64 total_input_file_bytes = 4;
- // Used for monitoring and scheduling.
- BuildInfo build_info = 5;
- // Timeout milliseconds for running this action.
- int64 timeout_millis = 6; // Maybe add io_timeout as well, per
- // Marc Antoine's suggestion.
- // Add other fields such as required cores, RAM, etc, that affect scheduling,
- // but not result, later. All the requirements that DO affect results should
- // be part of the Action.
-}
-
-message BuildInfo {
- string build_id = 1;
- // TBD, Fields used to identify a build action.
- // This will be useful for analysis purposes.
- // Note: we don't want to put any Bazel-specific fields in this.
-}
-
-message ExecutionStats {
-// TBD, will contain all the stats related to the execution:
-// time it took, resources, etc. Maybe will break into sub-messages
-// for various execution phases.
-}
-
-message ExecuteReply {
- ExecutionStatus status = 1;
- ActionResult result = 2;
- bool cached_result = 3; // Filled by the server on Execution Cache hit.
- ExecutionStats execution_stats = 4;
- CasStatus cas_error = 5; // A possible server-side CAS error, e.g. missing
- // inputs. The message contains the missing digests.
- // Later will introduce return AttemptHistory for monitoring and use
- // in requests.
-}
-
-message ExecutionStatus {
- bool executed = 1; // Whether the action was executed.
- bool succeeded = 2; // Whether the action succeeded.
- enum ErrorCode {
- UNKNOWN_ERROR = 0;
- MISSING_COMMAND = 1; // Missing command digest in CAS.
- MISSING_INPUT = 2; // Missing one of the inputs in CAS.
- DEADLINE_EXCEEDED = 3;
- EXEC_FAILED = 4; // Action returned non-zero.
- // Other server errors. Some of these errors are client-retriable, and some
- // not; will have to comment clearly what will happen on each error.
- }
- ErrorCode error = 3;
- string error_detail = 4;
- // These fields allow returning streaming statuses for the action progress.
- enum ActionStage {
- UNKNOWN_STAGE = 0;
- QUEUED = 1;
- EXECUTING = 2;
- FINISHED = 3;
- }
- ActionStage stage = 5;
- // Optionally will add more details pertaining to current stage, for example
- // time executing, or position in queue, etc.
-}
diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD
index 24e0835694..87c71572c2 100644
--- a/src/test/java/com/google/devtools/build/lib/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/BUILD
@@ -1068,8 +1068,6 @@ java_test(
"//src/main/java/com/google/devtools/build/lib/actions",
"//src/main/java/com/google/devtools/build/lib/remote",
"//src/main/java/com/google/devtools/common/options",
- "//src/main/protobuf:remote_protocol_java_grpc",
- "//src/main/protobuf:remote_protocol_java_proto",
"//third_party:api_client",
"//third_party:guava",
"//third_party:junit4",
@@ -1077,6 +1075,13 @@ java_test(
"//third_party:truth",
"//third_party/grpc:grpc-jar",
"//third_party/protobuf:protobuf_java",
+ "@googleapis//:google_bytestream_bytestream_java_grpc",
+ "@googleapis//:google_bytestream_bytestream_java_proto",
+ "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc",
+ "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto",
+ "@googleapis//:google_longrunning_operations_java_proto",
+ "@googleapis//:google_rpc_code_java_proto",
+ "@googleapis//:google_rpc_status_java_proto",
],
)
diff --git a/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java b/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java
index c7edee6ed0..ce445f7d4e 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/CachedLocalSpawnRunnerTest.java
@@ -14,7 +14,9 @@
package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -35,9 +37,7 @@ import com.google.devtools.build.lib.exec.SpawnRunner;
import com.google.devtools.build.lib.exec.SpawnRunner.ProgressStatus;
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionPolicy;
import com.google.devtools.build.lib.exec.util.FakeOwner;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
@@ -45,9 +45,10 @@ import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.devtools.common.options.Options;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.protobuf.ByteString;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.SortedMap;
import org.junit.Before;
@@ -55,110 +56,99 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.mockito.MockitoAnnotations.Mock;
/** Tests for {@link CachedLocalSpawnRunner}. */
@RunWith(JUnit4.class)
public class CachedLocalSpawnRunnerTest {
- private static final ArtifactExpander SIMPLE_ARTIFACT_EXPANDER = new ArtifactExpander() {
- @Override
- public void expand(Artifact artifact, Collection<? super Artifact> output) {
- output.add(artifact);
- }
- };
-
- private static final ContentDigest DIGEST_FOR_EMPTY = ContentDigests.computeDigest(new byte[0]);
+ private static final ArtifactExpander SIMPLE_ARTIFACT_EXPANDER =
+ new ArtifactExpander() {
+ @Override
+ public void expand(Artifact artifact, Collection<? super Artifact> output) {
+ output.add(artifact);
+ }
+ };
private FileSystem fs;
private Path execRoot;
private SimpleSpawn simpleSpawn;
private FakeActionInputFileCache fakeFileCache;
-
+ @Mock private RemoteActionCache cache;
+ @Mock private SpawnRunner delegate;
+ CachedLocalSpawnRunner runner;
private FileOutErr outErr;
- private final SpawnExecutionPolicy simplePolicy = new SpawnExecutionPolicy() {
- @Override
- public void lockOutputFiles() throws InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ActionInputFileCache getActionInputFileCache() {
- return fakeFileCache;
- }
-
- @Override
- public long getTimeoutMillis() {
- return 0;
- }
-
- @Override
- public FileOutErr getFileOutErr() {
- return outErr;
- }
-
- @Override
- public SortedMap<PathFragment, ActionInput> getInputMapping() throws IOException {
- return new SpawnInputExpander(/*strict*/false)
- .getInputMapping(simpleSpawn, SIMPLE_ARTIFACT_EXPANDER, fakeFileCache, "workspace");
- }
-
- @Override
- public void report(ProgressStatus state) {
- // TODO(ulfjack): Test that the right calls are made.
- }
- };
+ private final SpawnExecutionPolicy simplePolicy =
+ new SpawnExecutionPolicy() {
+ @Override
+ public void lockOutputFiles() throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ActionInputFileCache getActionInputFileCache() {
+ return fakeFileCache;
+ }
+
+ @Override
+ public long getTimeoutMillis() {
+ return 0;
+ }
+
+ @Override
+ public FileOutErr getFileOutErr() {
+ return outErr;
+ }
+
+ @Override
+ public SortedMap<PathFragment, ActionInput> getInputMapping() throws IOException {
+ return new SpawnInputExpander(/*strict*/ false)
+ .getInputMapping(simpleSpawn, SIMPLE_ARTIFACT_EXPANDER, fakeFileCache, "workspace");
+ }
+
+ @Override
+ public void report(ProgressStatus state) {
+ // TODO(ulfjack): Test that the right calls are made.
+ }
+ };
@Before
public final void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
fs = new InMemoryFileSystem();
execRoot = fs.getPath("/exec/root");
FileSystemUtils.createDirectoryAndParents(execRoot);
fakeFileCache = new FakeActionInputFileCache(execRoot);
- simpleSpawn = new SimpleSpawn(
- new FakeOwner("Mnemonic", "Progress Message"),
- ImmutableList.of("/bin/echo", "Hi!"),
- ImmutableMap.of("VARIABLE", "value"),
- /*executionInfo=*/ImmutableMap.<String, String>of(),
- /*inputs=*/ImmutableList.of(ActionInputHelper.fromPath("input")),
- /*outputs=*/ImmutableList.<ActionInput>of(),
- ResourceSet.ZERO
- );
+ simpleSpawn =
+ new SimpleSpawn(
+ new FakeOwner("Mnemonic", "Progress Message"),
+ ImmutableList.of("/bin/echo", "Hi!"),
+ ImmutableMap.of("VARIABLE", "value"),
+ /*executionInfo=*/ ImmutableMap.<String, String>of(),
+ /*inputs=*/ ImmutableList.of(ActionInputHelper.fromPath("input")),
+ /*outputs=*/ ImmutableList.<ActionInput>of(),
+ ResourceSet.ZERO);
Path stdout = fs.getPath("/tmp/stdout");
Path stderr = fs.getPath("/tmp/stderr");
FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory());
FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory());
outErr = new FileOutErr(stdout, stderr);
- }
-
- private void scratch(ActionInput input, String content) throws IOException {
- Path inputFile = execRoot.getRelative(input.getExecPath());
- FileSystemUtils.writeContentAsLatin1(inputFile, content);
- fakeFileCache.setDigest(
- simpleSpawn.getInputFiles().get(0), ByteString.copyFrom(inputFile.getSHA1Digest()));
+ RemoteOptions options = Options.getDefaults(RemoteOptions.class);
+ runner = new CachedLocalSpawnRunner(execRoot, options, cache, delegate);
+ fakeFileCache.createScratchInput(simpleSpawn.getInputFiles().get(0), "xyz");
}
@SuppressWarnings("unchecked")
@Test
public void cacheHit() throws Exception {
- RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- RemoteActionCache cache = Mockito.mock(RemoteActionCache.class);
- SpawnRunner delegate = Mockito.mock(SpawnRunner.class);
- CachedLocalSpawnRunner runner =
- new CachedLocalSpawnRunner(execRoot, options, cache, delegate);
- when(cache.getCachedActionResult(any(ActionKey.class)))
- .thenReturn(ActionResult.newBuilder().setReturnCode(0).build());
- when(cache.downloadBlobs(any(Iterable.class)))
- .thenReturn(ImmutableList.of(new byte[0], new byte[0]));
-
- scratch(simpleSpawn.getInputFiles().get(0), "xyz");
+ ActionResult actionResult = ActionResult.getDefaultInstance();
+ when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(actionResult);
SpawnResult result = runner.exec(simpleSpawn, simplePolicy);
- // We use verify to check that each method is called exactly once.
- // TODO(ulfjack): Check that we also call it with exactly the right parameters, not just any.
- verify(cache).getCachedActionResult(any(ActionKey.class));
- verify(cache).downloadAllResults(any(ActionResult.class), any(Path.class));
- verify(cache).downloadBlobs(any(Iterable.class));
+ // In line with state-based testing, we only verify calls that produce side effects.
+ verify(cache).downloadAllResults(actionResult, execRoot);
assertThat(result.setupSuccess()).isTrue();
assertThat(result.exitCode()).isEqualTo(0);
assertThat(outErr.hasRecordedOutput()).isFalse();
@@ -168,34 +158,42 @@ public class CachedLocalSpawnRunnerTest {
@SuppressWarnings("unchecked")
@Test
public void cacheHitWithOutput() throws Exception {
- RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- RemoteActionCache cache = Mockito.mock(RemoteActionCache.class);
- SpawnRunner delegate = Mockito.mock(SpawnRunner.class);
- CachedLocalSpawnRunner runner =
- new CachedLocalSpawnRunner(execRoot, options, cache, delegate);
- when(cache.getCachedActionResult(any(ActionKey.class)))
- .thenReturn(ActionResult.newBuilder().setReturnCode(0).build());
-
- scratch(simpleSpawn.getInputFiles().get(0), "xyz");
- byte[] cacheStdOut = "stdout".getBytes(StandardCharsets.UTF_8);
- byte[] cacheStdErr = "stderr".getBytes(StandardCharsets.UTF_8);
- ContentDigest stdOutDigest = ContentDigests.computeDigest(cacheStdOut);
- ContentDigest stdErrDigest = ContentDigests.computeDigest(cacheStdErr);
-
- ActionResult actionResult = ActionResult.newBuilder()
- .setReturnCode(0)
- .setStdoutDigest(stdOutDigest)
- .setStderrDigest(stdErrDigest)
- .build();
+ byte[] cacheStdOut = "stdout".getBytes(UTF_8);
+ byte[] cacheStdErr = "stderr".getBytes(UTF_8);
+ Digest stdOutDigest = Digests.computeDigest(cacheStdOut);
+ Digest stdErrDigest = Digests.computeDigest(cacheStdErr);
+
+ ActionResult actionResult =
+ ActionResult.newBuilder()
+ .setStdoutDigest(stdOutDigest)
+ .setStderrDigest(stdErrDigest)
+ .build();
when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(actionResult);
- when(cache.downloadBlobs(any(Iterable.class)))
- .thenReturn(ImmutableList.of(cacheStdOut, cacheStdErr));
+ when(cache.downloadBlob(stdOutDigest)).thenReturn(cacheStdOut);
+ when(cache.downloadBlob(stdErrDigest)).thenReturn(cacheStdErr);
SpawnResult result = runner.exec(simpleSpawn, simplePolicy);
- // We use verify to check that each method is called exactly once.
- verify(cache).getCachedActionResult(any(ActionKey.class));
- verify(cache).downloadAllResults(any(ActionResult.class), any(Path.class));
- verify(cache).downloadBlobs(any(Iterable.class));
+ // In line with state-based testing, we only verify calls that produce side effects.
+ verify(cache).downloadAllResults(actionResult, execRoot);
+ assertThat(result.setupSuccess()).isTrue();
+ assertThat(result.exitCode()).isEqualTo(0);
+ assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
+ assertThat(outErr.errAsLatin1()).isEqualTo("stderr");
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void cacheHitWithOutputsInlined() throws Exception {
+ ActionResult actionResult =
+ ActionResult.newBuilder()
+ .setStdoutRaw(ByteString.copyFromUtf8("stdout"))
+ .setStderrRaw(ByteString.copyFromUtf8("stderr"))
+ .build();
+ when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(actionResult);
+
+ SpawnResult result = runner.exec(simpleSpawn, simplePolicy);
+ // In line with state-based testing, we only verify calls that produce side effects.
+ verify(cache).downloadAllResults(actionResult, execRoot);
assertThat(result.setupSuccess()).isTrue();
assertThat(result.exitCode()).isEqualTo(0);
assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
@@ -205,33 +203,29 @@ public class CachedLocalSpawnRunnerTest {
@SuppressWarnings("unchecked")
@Test
public void cacheMiss() throws Exception {
- RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- RemoteActionCache cache = Mockito.mock(RemoteActionCache.class);
- SpawnRunner delegate = Mockito.mock(SpawnRunner.class);
- CachedLocalSpawnRunner runner =
- new CachedLocalSpawnRunner(execRoot, options, cache, delegate);
- when(cache.getCachedActionResult(any(ActionKey.class)))
- .thenReturn(ActionResult.newBuilder().setReturnCode(0).build());
-
- scratch(simpleSpawn.getInputFiles().get(0), "xyz");
-
- when(cache.getCachedActionResult(any(ActionKey.class))).thenReturn(null);
- when(cache.uploadFileContents(any(Path.class))).thenReturn(DIGEST_FOR_EMPTY);
- SpawnResult delegateResult = new SpawnResult.Builder()
- .setExitCode(0)
- .setStatus(Status.SUCCESS)
- .build();
+ byte[] cacheStdOut = "stdout".getBytes(UTF_8);
+ byte[] cacheStdErr = "stderr".getBytes(UTF_8);
+ Digest stdOutDigest = Digests.computeDigest(cacheStdOut);
+ Digest stdErrDigest = Digests.computeDigest(cacheStdErr);
+ when(cache.uploadFileContents(any(Path.class))).thenReturn(stdErrDigest, stdOutDigest);
+ SpawnResult delegateResult =
+ new SpawnResult.Builder().setExitCode(0).setStatus(Status.SUCCESS).build();
when(delegate.exec(any(Spawn.class), any(SpawnExecutionPolicy.class)))
.thenReturn(delegateResult);
SpawnResult result = runner.exec(simpleSpawn, simplePolicy);
+ assertThat(result.setupSuccess()).isTrue();
+ assertThat(result.exitCode()).isEqualTo(0);
// We use verify to check that each method is called the correct number of times.
verify(cache)
.uploadAllResults(any(Path.class), any(Collection.class), any(ActionResult.Builder.class));
// Two additional uploads for stdout and stderr.
verify(cache, Mockito.times(2)).uploadFileContents(any(Path.class));
- verify(cache).setCachedActionResult(any(ActionKey.class), any(ActionResult.class));
- assertThat(result.setupSuccess()).isTrue();
- assertThat(result.exitCode()).isEqualTo(0);
+ ActionResult actionResult =
+ ActionResult.newBuilder()
+ .setStdoutDigest(stdOutDigest)
+ .setStderrDigest(stdErrDigest)
+ .build();
+ verify(cache).setCachedActionResult(any(ActionKey.class), eq(actionResult));
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java
index 13be580204..b51b9d6259 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/ChunkerTest.java
@@ -17,9 +17,8 @@ import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
import com.google.common.collect.ImmutableSet;
-import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.protobuf.ByteString;
+import com.google.devtools.build.lib.remote.Chunker.Chunk;
+import com.google.devtools.remoteexecution.v1test.Digest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -28,37 +27,29 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class ChunkerTest {
- static BlobChunk buildChunk(long offset, String data) {
- return BlobChunk.newBuilder().setOffset(offset).setData(ByteString.copyFromUtf8(data)).build();
- }
-
- static BlobChunk buildChunk(ContentDigest digest, String data) {
- return BlobChunk.newBuilder().setDigest(digest).setData(ByteString.copyFromUtf8(data)).build();
- }
-
@Test
public void testChunker() throws Exception {
byte[] b1 = "abcdefg".getBytes(UTF_8);
byte[] b2 = "hij".getBytes(UTF_8);
byte[] b3 = "klmnopqrstuvwxyz".getBytes(UTF_8);
- ContentDigest d1 = ContentDigests.computeDigest(b1);
- ContentDigest d2 = ContentDigests.computeDigest(b2);
- ContentDigest d3 = ContentDigests.computeDigest(b3);
+ Digest d1 = Digests.computeDigest(b1);
+ Digest d2 = Digests.computeDigest(b2);
+ Digest d3 = Digests.computeDigest(b3);
Chunker c = new Chunker.Builder().chunkSize(5).addInput(b1).addInput(b2).addInput(b3).build();
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(d1, "abcde"));
+ assertThat(c.next()).isEqualTo(new Chunk(d1, "abcde".getBytes(UTF_8), 0));
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(5, "fg"));
+ assertThat(c.next()).isEqualTo(new Chunk(d1, "fg".getBytes(UTF_8), 5));
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(d2, "hij"));
+ assertThat(c.next()).isEqualTo(new Chunk(d2, "hij".getBytes(UTF_8), 0));
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(d3, "klmno"));
+ assertThat(c.next()).isEqualTo(new Chunk(d3, "klmno".getBytes(UTF_8), 0));
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(5, "pqrst"));
+ assertThat(c.next()).isEqualTo(new Chunk(d3, "pqrst".getBytes(UTF_8), 5));
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(10, "uvwxy"));
+ assertThat(c.next()).isEqualTo(new Chunk(d3, "uvwxy".getBytes(UTF_8), 10));
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(15, "z"));
+ assertThat(c.next()).isEqualTo(new Chunk(d3, "z".getBytes(UTF_8), 15));
assertThat(c.hasNext()).isFalse();
}
@@ -68,8 +59,8 @@ public class ChunkerTest {
byte[] b2 = "bb".getBytes(UTF_8);
byte[] b3 = "ccc".getBytes(UTF_8);
byte[] b4 = "dddd".getBytes(UTF_8);
- ContentDigest d1 = ContentDigests.computeDigest(b1);
- ContentDigest d3 = ContentDigests.computeDigest(b3);
+ Digest d1 = Digests.computeDigest(b1);
+ Digest d3 = Digests.computeDigest(b3);
Chunker c =
new Chunker.Builder()
.chunkSize(2)
@@ -80,11 +71,11 @@ public class ChunkerTest {
.addInput(b4)
.build();
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(d1, "a"));
+ assertThat(c.next()).isEqualTo(new Chunk(d1, "a".getBytes(UTF_8), 0));
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(d3, "cc"));
+ assertThat(c.next()).isEqualTo(new Chunk(d3, "cc".getBytes(UTF_8), 0));
assertThat(c.hasNext()).isTrue();
- assertThat(c.next()).isEqualTo(buildChunk(2, "c"));
+ assertThat(c.next()).isEqualTo(new Chunk(d3, "c".getBytes(UTF_8), 2));
assertThat(c.hasNext()).isFalse();
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java b/src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java
index 74994aabdd..3193e0b66f 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/FakeActionInputFileCache.java
@@ -20,7 +20,9 @@ import com.google.common.hash.HashCode;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.Artifact;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.remoteexecution.v1test.Digest;
import com.google.protobuf.ByteString;
import java.io.IOException;
import javax.annotation.Nullable;
@@ -28,20 +30,21 @@ import javax.annotation.Nullable;
/** A fake implementation of the {@link ActionInputFileCache} interface. */
final class FakeActionInputFileCache implements ActionInputFileCache {
private final Path execRoot;
- private final BiMap<ActionInput, ByteString> cas = HashBiMap.create();
+ private final BiMap<ActionInput, String> cas = HashBiMap.create();
FakeActionInputFileCache(Path execRoot) {
this.execRoot = execRoot;
}
- void setDigest(ActionInput input, ByteString digest) {
+ void setDigest(ActionInput input, String digest) {
cas.put(input, digest);
}
@Override
@Nullable
public byte[] getDigest(ActionInput input) throws IOException {
- return Preconditions.checkNotNull(cas.get(input), input).toByteArray();
+ String hexDigest = Preconditions.checkNotNull(cas.get(input), input);
+ return HashCode.fromString(hexDigest).asBytes();
}
@Override
@@ -63,12 +66,20 @@ final class FakeActionInputFileCache implements ActionInputFileCache {
@Nullable
public ActionInput getInputFromDigest(ByteString hexDigest) {
HashCode code = HashCode.fromString(hexDigest.toStringUtf8());
- ByteString digest = ByteString.copyFrom(code.asBytes());
- return Preconditions.checkNotNull(cas.inverse().get(digest));
+ return Preconditions.checkNotNull(cas.inverse().get(code.toString()));
}
@Override
public Path getInputPath(ActionInput input) {
throw new UnsupportedOperationException();
}
+
+ public Digest createScratchInput(ActionInput input, String content) throws IOException {
+ Path inputFile = execRoot.getRelative(input.getExecPath());
+ FileSystemUtils.createDirectoryAndParents(inputFile.getParentDirectory());
+ FileSystemUtils.writeContentAsLatin1(inputFile, content);
+ Digest digest = Digests.computeDigest(inputFile);
+ setDigest(input, digest.getHash());
+ return digest;
+ }
} \ No newline at end of file
diff --git a/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java b/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java
new file mode 100644
index 0000000000..4de1498207
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/remote/FakeImmutableCacheByteStreamImpl.java
@@ -0,0 +1,56 @@
+// Copyright 2017 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
+import com.google.bytestream.ByteStreamProto.ReadRequest;
+import com.google.bytestream.ByteStreamProto.ReadResponse;
+import com.google.common.collect.ImmutableMap;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.protobuf.ByteString;
+import io.grpc.stub.StreamObserver;
+import java.util.Map;
+
+class FakeImmutableCacheByteStreamImpl extends ByteStreamImplBase {
+ private final Map<ReadRequest, ReadResponse> cannedReplies;
+
+ public FakeImmutableCacheByteStreamImpl(Map<Digest, String> contents) {
+ ImmutableMap.Builder<ReadRequest, ReadResponse> b = ImmutableMap.builder();
+ for (Map.Entry<Digest, String> e : contents.entrySet()) {
+ b.put(
+ ReadRequest.newBuilder()
+ .setResourceName("blobs/" + e.getKey().getHash() + "/" + e.getKey().getSizeBytes())
+ .build(),
+ ReadResponse.newBuilder().setData(ByteString.copyFromUtf8(e.getValue())).build());
+ }
+ cannedReplies = b.build();
+ }
+
+ public FakeImmutableCacheByteStreamImpl(Digest digest, String contents) {
+ this(ImmutableMap.of(digest, contents));
+ }
+
+ public FakeImmutableCacheByteStreamImpl(Digest d1, String c1, Digest d2, String c2) {
+ this(ImmutableMap.of(d1, c1, d2, c2));
+ }
+
+ @Override
+ public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+ assertThat(cannedReplies.containsKey(request)).isTrue();
+ responseObserver.onNext(cannedReplies.get(request));
+ responseObserver.onCompleted();
+ }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java
index c72e819820..3cb2bd218c 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcActionCacheTest.java
@@ -15,78 +15,81 @@ package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.when;
import com.google.api.client.json.GenericJson;
import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
+import com.google.bytestream.ByteStreamProto.ReadRequest;
+import com.google.bytestream.ByteStreamProto.ReadResponse;
+import com.google.bytestream.ByteStreamProto.WriteRequest;
+import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.devtools.build.lib.actions.Root;
+import com.google.devtools.build.lib.actions.ActionInputHelper;
import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceImplBase;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
import com.google.devtools.build.lib.testutil.Scratch;
-import com.google.devtools.build.lib.util.Preconditions;
+import com.google.devtools.build.lib.vfs.FileSystem;
+import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.Path;
+import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.devtools.common.options.Options;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
-import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.stub.StreamObserver;
+import io.grpc.util.MutableHandlerRegistry;
import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-import org.mockito.MockitoAnnotations;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
/** Tests for {@link GrpcActionCache}. */
@RunWith(JUnit4.class)
public class GrpcActionCacheTest {
- private final FakeRemoteCacheService fakeRemoteCacheService = new FakeRemoteCacheService();
-
- private final Server server =
- InProcessServerBuilder.forName(getClass().getSimpleName())
- .directExecutor()
- .addService(fakeRemoteCacheService)
- .build();
-
- private final ManagedChannel channel =
- InProcessChannelBuilder.forName(getClass().getSimpleName()).directExecutor().build();
- private Scratch scratch;
- private Root rootDir;
+ private FileSystem fs;
+ private Path execRoot;
+ private FakeActionInputFileCache fakeFileCache;
+ private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
+ private final String fakeServerName = "fake server for " + getClass();
+ private Server fakeServer;
@Before
public final void setUp() throws Exception {
- MockitoAnnotations.initMocks(this);
- scratch = new Scratch();
- rootDir = Root.asDerivedRoot(scratch.dir("/exec/root"));
- server.start();
+ // Use a mutable service registry for later registering the service impl for each test case.
+ fakeServer =
+ InProcessServerBuilder.forName(fakeServerName)
+ .fallbackHandlerRegistry(serviceRegistry)
+ .directExecutor()
+ .build()
+ .start();
+ Chunker.setDefaultChunkSizeForTesting(1000); // Enough for everything to be one chunk.
+ fs = new InMemoryFileSystem();
+ execRoot = fs.getPath("/exec/root");
+ FileSystemUtils.createDirectoryAndParents(execRoot);
+ fakeFileCache = new FakeActionInputFileCache(execRoot);
}
@After
- public void tearDown() {
- server.shutdownNow();
- channel.shutdownNow();
+ public void tearDown() throws Exception {
+ fakeServer.shutdownNow();
}
private static class ChannelOptionsInterceptor implements ClientInterceptor {
@@ -106,316 +109,332 @@ public class GrpcActionCacheTest {
}
private GrpcActionCache newClient() throws IOException {
- return newClient(Options.getDefaults(RemoteOptions.class),
- Options.getDefaults(AuthAndTLSOptions.class));
- }
+ AuthAndTLSOptions authTlsOptions = Options.getDefaults(AuthAndTLSOptions.class);
+ authTlsOptions.authEnabled = true;
+ authTlsOptions.authCredentials = "/exec/root/creds.json";
+ authTlsOptions.authScope = "dummy.scope";
+
+ GenericJson json = new GenericJson();
+ json.put("type", "authorized_user");
+ json.put("client_id", "some_client");
+ json.put("client_secret", "foo");
+ json.put("refresh_token", "bar");
+ Scratch scratch = new Scratch();
+ scratch.file(authTlsOptions.authCredentials, new JacksonFactory().toString(json));
- private GrpcActionCache newClient(RemoteOptions remoteOptions, AuthAndTLSOptions authTlsOptions)
- throws IOException {
ChannelOptions channelOptions =
- authTlsOptions.authCredentials != null
- ? ChannelOptions.create(
- authTlsOptions, remoteOptions.grpcMaxChunkSizeBytes,
- scratch.resolve(authTlsOptions.authCredentials).getInputStream())
- : ChannelOptions.create(authTlsOptions, remoteOptions.grpcMaxChunkSizeBytes);
+ ChannelOptions.create(
+ authTlsOptions, scratch.resolve(authTlsOptions.authCredentials).getInputStream());
return new GrpcActionCache(
ClientInterceptors.intercept(
- channel, ImmutableList.of(new ChannelOptionsInterceptor(channelOptions))),
- remoteOptions,
- channelOptions);
+ InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(),
+ ImmutableList.of(new ChannelOptionsInterceptor(channelOptions))),
+ channelOptions,
+ Options.getDefaults(RemoteOptions.class));
}
@Test
- public void testDownloadEmptyBlobs() throws Exception {
+ public void testDownloadEmptyBlob() throws Exception {
GrpcActionCache client = newClient();
- ContentDigest fooDigest = fakeRemoteCacheService.put("foo".getBytes(UTF_8));
- ContentDigest emptyDigest = ContentDigests.computeDigest(new byte[0]);
- ImmutableList<byte[]> results =
- client.downloadBlobs(ImmutableList.<ContentDigest>of(emptyDigest, fooDigest, emptyDigest));
- // Will not query the server for empty blobs.
- assertThat(new String(results.get(0), UTF_8)).isEmpty();
- assertThat(new String(results.get(1), UTF_8)).isEqualTo("foo");
- assertThat(new String(results.get(2), UTF_8)).isEmpty();
- // Will not call the server at all.
- assertThat(new String(client.downloadBlob(emptyDigest), UTF_8)).isEmpty();
+ Digest emptyDigest = Digests.computeDigest(new byte[0]);
+ // Will not call the mock Bytestream interface at all.
+ assertThat(client.downloadBlob(emptyDigest)).isEmpty();
}
@Test
- public void testDownloadBlobs() throws Exception {
- GrpcActionCache client = newClient();
- ContentDigest fooDigest = fakeRemoteCacheService.put("foo".getBytes(UTF_8));
- ContentDigest barDigest = fakeRemoteCacheService.put("bar".getBytes(UTF_8));
- ImmutableList<byte[]> results =
- client.downloadBlobs(ImmutableList.<ContentDigest>of(fooDigest, barDigest));
- assertThat(new String(results.get(0), UTF_8)).isEqualTo("foo");
- assertThat(new String(results.get(1), UTF_8)).isEqualTo("bar");
+ public void testDownloadBlobSingleChunk() throws Exception {
+ final GrpcActionCache client = newClient();
+ final Digest digest = Digests.computeDigestUtf8("abcdefg");
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+ assertThat(request.getResourceName().contains(digest.getHash())).isTrue();
+ responseObserver.onNext(
+ ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abcdefg")).build());
+ responseObserver.onCompleted();
+ }
+ });
+ assertThat(new String(client.downloadBlob(digest), UTF_8)).isEqualTo("abcdefg");
}
@Test
- public void testDownloadBlobsBatchChunk() throws Exception {
- RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- options.grpcMaxBatchInputs = 10;
- options.grpcMaxChunkSizeBytes = 2;
- options.grpcMaxBatchSizeBytes = 10;
- options.remoteTimeout = 10;
- GrpcActionCache client = newClient(options, Options.getDefaults(AuthAndTLSOptions.class));
- ContentDigest fooDigest = fakeRemoteCacheService.put("fooooooo".getBytes(UTF_8));
- ContentDigest barDigest = fakeRemoteCacheService.put("baaaar".getBytes(UTF_8));
- ContentDigest s1Digest = fakeRemoteCacheService.put("1".getBytes(UTF_8));
- ContentDigest s2Digest = fakeRemoteCacheService.put("2".getBytes(UTF_8));
- ContentDigest s3Digest = fakeRemoteCacheService.put("3".getBytes(UTF_8));
- ImmutableList<byte[]> results =
- client.downloadBlobs(
- ImmutableList.<ContentDigest>of(fooDigest, barDigest, s1Digest, s2Digest, s3Digest));
- assertThat(new String(results.get(0), UTF_8)).isEqualTo("fooooooo");
- assertThat(new String(results.get(1), UTF_8)).isEqualTo("baaaar");
- assertThat(new String(results.get(2), UTF_8)).isEqualTo("1");
- assertThat(new String(results.get(3), UTF_8)).isEqualTo("2");
- assertThat(new String(results.get(4), UTF_8)).isEqualTo("3");
+ public void testDownloadBlobMultipleChunks() throws Exception {
+ final GrpcActionCache client = newClient();
+ final Digest digest = Digests.computeDigestUtf8("abcdefg");
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+ assertThat(request.getResourceName().contains(digest.getHash())).isTrue();
+ responseObserver.onNext(
+ ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("abc")).build());
+ responseObserver.onNext(
+ ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("def")).build());
+ responseObserver.onNext(
+ ReadResponse.newBuilder().setData(ByteString.copyFromUtf8("g")).build());
+ responseObserver.onCompleted();
+ }
+ });
+ assertThat(new String(client.downloadBlob(digest), UTF_8)).isEqualTo("abcdefg");
}
@Test
- public void testUploadBlobs() throws Exception {
+ public void testDownloadAllResults() throws Exception {
GrpcActionCache client = newClient();
- byte[] foo = "foo".getBytes(UTF_8);
- byte[] bar = "bar".getBytes(UTF_8);
- ContentDigest fooDigest = ContentDigests.computeDigest(foo);
- ContentDigest barDigest = ContentDigests.computeDigest(bar);
- ImmutableList<ContentDigest> digests = client.uploadBlobs(ImmutableList.<byte[]>of(foo, bar));
- assertThat(digests).containsExactly(fooDigest, barDigest);
- assertThat(fakeRemoteCacheService.get(fooDigest)).isEqualTo(foo);
- assertThat(fakeRemoteCacheService.get(barDigest)).isEqualTo(bar);
- }
+ Digest fooDigest = Digests.computeDigestUtf8("foo-contents");
+ Digest barDigest = Digests.computeDigestUtf8("bar-contents");
+ Digest emptyDigest = Digests.computeDigest(new byte[0]);
+ serviceRegistry.addService(
+ new FakeImmutableCacheByteStreamImpl(fooDigest, "foo-contents", barDigest, "bar-contents"));
- @Test
- public void testUploadBlobsBatchChunk() throws Exception {
- RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- options.grpcMaxBatchInputs = 10;
- options.grpcMaxChunkSizeBytes = 2;
- options.grpcMaxBatchSizeBytes = 10;
- options.remoteTimeout = 10;
- GrpcActionCache client = newClient(options, Options.getDefaults(AuthAndTLSOptions.class));
-
- byte[] foo = "fooooooo".getBytes(UTF_8);
- byte[] bar = "baaaar".getBytes(UTF_8);
- byte[] s1 = "1".getBytes(UTF_8);
- byte[] s2 = "2".getBytes(UTF_8);
- byte[] s3 = "3".getBytes(UTF_8);
- ContentDigest fooDigest = ContentDigests.computeDigest(foo);
- ContentDigest barDigest = ContentDigests.computeDigest(bar);
- ContentDigest s1Digest = ContentDigests.computeDigest(s1);
- ContentDigest s2Digest = ContentDigests.computeDigest(s2);
- ContentDigest s3Digest = ContentDigests.computeDigest(s3);
- ImmutableList<ContentDigest> digests =
- client.uploadBlobs(ImmutableList.<byte[]>of(foo, bar, s1, s2, s3));
- assertThat(digests).containsExactly(fooDigest, barDigest, s1Digest, s2Digest, s3Digest);
- assertThat(fakeRemoteCacheService.get(fooDigest)).isEqualTo(foo);
- assertThat(fakeRemoteCacheService.get(barDigest)).isEqualTo(bar);
- assertThat(fakeRemoteCacheService.get(s1Digest)).isEqualTo(s1);
- assertThat(fakeRemoteCacheService.get(s2Digest)).isEqualTo(s2);
- assertThat(fakeRemoteCacheService.get(s3Digest)).isEqualTo(s3);
- }
-
- @Test
- public void testUploadAllResults() throws Exception {
- GrpcActionCache client = newClient();
- byte[] foo = "foo".getBytes(UTF_8);
- byte[] bar = "bar".getBytes(UTF_8);
- Path fooFile = scratch.file("/exec/root/a/foo", foo);
- Path emptyFile = scratch.file("/exec/root/b/empty");
- Path barFile = scratch.file("/exec/root/a/bar", bar);
- ContentDigest fooDigest = ContentDigests.computeDigest(fooFile);
- ContentDigest barDigest = ContentDigests.computeDigest(barFile);
- ContentDigest emptyDigest = ContentDigests.computeDigest(new byte[0]);
ActionResult.Builder result = ActionResult.newBuilder();
- client.uploadAllResults(
- rootDir.getPath(), ImmutableList.<Path>of(fooFile, emptyFile, barFile), result);
- assertThat(fakeRemoteCacheService.get(fooDigest)).isEqualTo(foo);
- assertThat(fakeRemoteCacheService.get(barDigest)).isEqualTo(bar);
- ActionResult.Builder expectedResult = ActionResult.newBuilder();
- expectedResult
- .addOutputBuilder()
- .setPath("a/foo")
- .getFileMetadataBuilder()
- .setDigest(fooDigest);
- expectedResult
- .addOutputBuilder()
- .setPath("b/empty")
- .getFileMetadataBuilder()
- .setDigest(emptyDigest);
- expectedResult
- .addOutputBuilder()
- .setPath("a/bar")
- .getFileMetadataBuilder()
- .setDigest(barDigest);
- assertThat(result.build()).isEqualTo(expectedResult.build());
+ result.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
+ result.addOutputFilesBuilder().setPath("b/empty").setDigest(emptyDigest);
+ result.addOutputFilesBuilder().setPath("a/bar").setDigest(barDigest).setIsExecutable(true);
+ client.downloadAllResults(result.build(), execRoot);
+ assertThat(Digests.computeDigest(execRoot.getRelative("a/foo"))).isEqualTo(fooDigest);
+ assertThat(Digests.computeDigest(execRoot.getRelative("b/empty"))).isEqualTo(emptyDigest);
+ assertThat(Digests.computeDigest(execRoot.getRelative("a/bar"))).isEqualTo(barDigest);
+ assertThat(execRoot.getRelative("a/bar").isExecutable()).isTrue();
}
@Test
- public void testDownloadAllResults() throws Exception {
- GrpcActionCache client = newClient();
- ContentDigest fooDigest = fakeRemoteCacheService.put("foo".getBytes(UTF_8));
- ContentDigest barDigest = fakeRemoteCacheService.put("bar".getBytes(UTF_8));
- ContentDigest emptyDigest = ContentDigests.computeDigest(new byte[0]);
- ActionResult.Builder result = ActionResult.newBuilder();
- result.addOutputBuilder().setPath("a/foo").getFileMetadataBuilder().setDigest(fooDigest);
- result.addOutputBuilder().setPath("b/empty").getFileMetadataBuilder().setDigest(emptyDigest);
- result.addOutputBuilder().setPath("a/bar").getFileMetadataBuilder().setDigest(barDigest);
- client.downloadAllResults(result.build(), rootDir.getPath());
- Path fooFile = rootDir.getPath().getRelative("a/foo");
- Path emptyFile = rootDir.getPath().getRelative("b/empty");
- Path barFile = rootDir.getPath().getRelative("a/bar");
- assertThat(ContentDigests.computeDigest(fooFile)).isEqualTo(fooDigest);
- assertThat(ContentDigests.computeDigest(emptyFile)).isEqualTo(emptyDigest);
- assertThat(ContentDigests.computeDigest(barFile)).isEqualTo(barDigest);
+ public void testUploadBlobCacheHit() throws Exception {
+ final GrpcActionCache client = newClient();
+ final Digest digest = Digests.computeDigestUtf8("abcdefg");
+ serviceRegistry.addService(
+ new ContentAddressableStorageImplBase() {
+ @Override
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ }
+ });
+ assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest);
}
@Test
- public void testAuthCredentials() throws Exception {
- AuthAndTLSOptions options = Options.getDefaults(AuthAndTLSOptions.class);
- options.authEnabled = true;
- options.authCredentials = "/exec/root/creds.json";
- options.authScope = "dummy.scope";
-
- GenericJson json = new GenericJson();
- json.put("type", "authorized_user");
- json.put("client_id", "some_client");
- json.put("client_secret", "foo");
- json.put("refresh_token", "bar");
- scratch.file(options.authCredentials, new JacksonFactory().toString(json));
-
- GrpcActionCache client = newClient(Options.getDefaults(RemoteOptions.class), options);
- byte[] foo = "foo".getBytes(UTF_8);
- ContentDigest fooDigest = ContentDigests.computeDigest(foo);
- ImmutableList<ContentDigest> digests = client.uploadBlobs(ImmutableList.<byte[]>of(foo));
- assertThat(digests).containsExactly(fooDigest);
- assertThat(fakeRemoteCacheService.get(fooDigest)).isEqualTo(foo);
+ public void testUploadBlobSingleChunk() throws Exception {
+ final GrpcActionCache client = newClient();
+ final Digest digest = Digests.computeDigestUtf8("abcdefg");
+ serviceRegistry.addService(
+ new ContentAddressableStorageImplBase() {
+ @Override
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ responseObserver.onNext(
+ FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(digest).build());
+ responseObserver.onCompleted();
+ }
+ });
+ serviceRegistry.addService(
+ new ByteStreamImplBase() {
+ @Override
+ public StreamObserver<WriteRequest> write(
+ final StreamObserver<WriteResponse> responseObserver) {
+ return new StreamObserver<WriteRequest>() {
+ @Override
+ public void onNext(WriteRequest request) {
+ assertThat(request.getResourceName()).contains(digest.getHash());
+ assertThat(request.getFinishWrite()).isTrue();
+ assertThat(request.getData().toStringUtf8()).isEqualTo("abcdefg");
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onNext(WriteResponse.newBuilder().setCommittedSize(7).build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ fail("An error occurred: " + t);
+ }
+ };
+ }
+ });
+ assertThat(client.uploadBlob("abcdefg".getBytes(UTF_8))).isEqualTo(digest);
}
- private static class FakeRemoteCacheService extends CasServiceImplBase {
- private final ConcurrentMap<String, byte[]> cache = Maps.newConcurrentMap();
-
- public ContentDigest put(byte[] blob) {
- ContentDigest digest = ContentDigests.computeDigest(blob);
- cache.put(ContentDigests.toHexString(digest), blob);
- return digest;
+ static class TestChunkedRequestObserver implements StreamObserver<WriteRequest> {
+ private final StreamObserver<WriteResponse> responseObserver;
+ private final String contents;
+ private Chunker chunker;
+
+ public TestChunkedRequestObserver(
+ StreamObserver<WriteResponse> responseObserver, String contents, int chunkSizeBytes) {
+ this.responseObserver = responseObserver;
+ this.contents = contents;
+ try {
+ chunker = Chunker.from(contents.getBytes(UTF_8), chunkSizeBytes);
+ } catch (IOException e) {
+ fail("An error occurred:" + e);
+ }
}
- public byte[] get(ContentDigest digest) {
- return cache.get(ContentDigests.toHexString(digest));
+ @Override
+ public void onNext(WriteRequest request) {
+ assertThat(chunker.hasNext()).isTrue();
+ try {
+ Chunker.Chunk chunk = chunker.next();
+ Digest digest = chunk.getDigest();
+ long offset = chunk.getOffset();
+ byte[] data = chunk.getData();
+ if (offset == 0) {
+ assertThat(request.getResourceName()).contains(digest.getHash());
+ } else {
+ assertThat(request.getResourceName()).isEmpty();
+ }
+ assertThat(request.getFinishWrite())
+ .isEqualTo(offset + data.length == digest.getSizeBytes());
+ assertThat(request.getData().toByteArray()).isEqualTo(data);
+ } catch (IOException e) {
+ fail("An error occurred:" + e);
+ }
}
- public void clear() {
- cache.clear();
+ @Override
+ public void onCompleted() {
+ assertThat(chunker.hasNext()).isFalse();
+ responseObserver.onNext(
+ WriteResponse.newBuilder().setCommittedSize(contents.length()).build());
+ responseObserver.onCompleted();
}
@Override
- public void lookup(CasLookupRequest request, StreamObserver<CasLookupReply> observer) {
- CasLookupReply.Builder reply = CasLookupReply.newBuilder();
- CasStatus.Builder status = reply.getStatusBuilder();
- for (ContentDigest digest : request.getDigestList()) {
- if (get(digest) == null) {
- status.addMissingDigest(digest);
- }
- }
- status.setSucceeded(true);
- observer.onNext(reply.build());
- observer.onCompleted();
+ public void onError(Throwable t) {
+ fail("An error occurred: " + t);
}
+ }
- @Override
- public void downloadBlob(
- CasDownloadBlobRequest request, StreamObserver<CasDownloadReply> observer) {
- CasDownloadReply.Builder reply = CasDownloadReply.newBuilder();
- CasStatus.Builder status = reply.getStatusBuilder();
- boolean success = true;
- for (ContentDigest digest : request.getDigestList()) {
- if (get(digest) == null) {
- status.addMissingDigest(digest);
- success = false;
- }
- }
- if (!success) {
- status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
- status.setSucceeded(false);
- observer.onNext(reply.build());
- observer.onCompleted();
- return;
+ private Answer<StreamObserver<WriteRequest>> blobChunkedWriteAnswer(
+ final String contents, final int chunkSize) {
+ return new Answer<StreamObserver<WriteRequest>>() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public StreamObserver<WriteRequest> answer(InvocationOnMock invocation) {
+ return new TestChunkedRequestObserver(
+ (StreamObserver<WriteResponse>) invocation.getArguments()[0], contents, chunkSize);
}
- // We change the order on purpose, to test for blobs out of order:
- for (ContentDigest digest : Lists.reverse(request.getDigestList())) {
- observer.onNext(
- CasDownloadReply.newBuilder()
- .setStatus(CasStatus.newBuilder().setSucceeded(true))
- .setData(
- BlobChunk.newBuilder()
- .setDigest(digest)
- .setData(ByteString.copyFrom(get(digest))))
- .build());
- }
- observer.onCompleted();
+ };
+ }
+
+ @Test
+ public void testUploadBlobMultipleChunks() throws Exception {
+ final Digest digest = Digests.computeDigestUtf8("abcdef");
+ serviceRegistry.addService(
+ new ContentAddressableStorageImplBase() {
+ @Override
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ responseObserver.onNext(
+ FindMissingBlobsResponse.newBuilder().addMissingBlobDigests(digest).build());
+ responseObserver.onCompleted();
+ }
+ });
+
+ ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class);
+ serviceRegistry.addService(mockByteStreamImpl);
+ for (int chunkSize = 1; chunkSize <= 6; ++chunkSize) {
+ GrpcActionCache client = newClient();
+ Chunker.setDefaultChunkSizeForTesting(chunkSize);
+ when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject()))
+ .thenAnswer(blobChunkedWriteAnswer("abcdef", chunkSize));
+ assertThat(client.uploadBlob("abcdef".getBytes(UTF_8))).isEqualTo(digest);
}
+ }
- @Override
- public StreamObserver<CasUploadBlobRequest> uploadBlob(
- final StreamObserver<CasUploadBlobReply> responseObserver) {
- return new StreamObserver<CasUploadBlobRequest>() {
- byte[] blob = null;
- ContentDigest digest = null;
- long offset = 0;
-
- @Override
- public void onNext(CasUploadBlobRequest request) {
- BlobChunk chunk = request.getData();
- try {
- if (chunk.hasDigest()) {
- // Check if the previous chunk was really done.
- Preconditions.checkArgument(
- digest == null || offset == 0,
- "Missing input chunk for digest %s",
- digest == null ? "" : ContentDigests.toString(digest));
- digest = chunk.getDigest();
- blob = new byte[(int) digest.getSizeBytes()];
- }
- Preconditions.checkArgument(digest != null, "First chunk contains no digest");
- Preconditions.checkArgument(
- offset == chunk.getOffset(),
- "Missing input chunk for digest %s",
- ContentDigests.toString(digest));
- if (digest.getSizeBytes() > 0) {
- chunk.getData().copyTo(blob, (int) offset);
- offset = (offset + chunk.getData().size()) % digest.getSizeBytes();
- }
- if (offset == 0) {
- ContentDigest uploadedDigest = put(blob);
- Preconditions.checkArgument(
- uploadedDigest.equals(digest),
- "Digest mismatch: client sent %s, server computed %s",
- ContentDigests.toString(digest),
- ContentDigests.toString(uploadedDigest));
- }
- } catch (Exception e) {
- CasUploadBlobReply.Builder reply = CasUploadBlobReply.newBuilder();
- reply
- .getStatusBuilder()
- .setSucceeded(false)
- .setError(
- e instanceof IllegalArgumentException
- ? CasStatus.ErrorCode.INVALID_ARGUMENT
- : CasStatus.ErrorCode.UNKNOWN)
- .setErrorDetail(e.toString());
- responseObserver.onNext(reply.build());
+ @Test
+ public void testUploadAllResultsCacheHits() throws Exception {
+ final GrpcActionCache client = newClient();
+ final Digest fooDigest =
+ fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
+ final Digest barDigest =
+ fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar"), "x");
+ final Path fooFile = execRoot.getRelative("a/foo");
+ final Path barFile = execRoot.getRelative("bar");
+ barFile.setExecutable(true);
+ serviceRegistry.addService(
+ new ContentAddressableStorageImplBase() {
+ @Override
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ assertThat(request)
+ .isEqualTo(
+ FindMissingBlobsRequest.newBuilder()
+ .addBlobDigests(fooDigest)
+ .addBlobDigests(barDigest)
+ .build());
+ // Nothing is missing.
+ responseObserver.onNext(FindMissingBlobsResponse.getDefaultInstance());
+ responseObserver.onCompleted();
}
- }
+ });
- @Override
- public void onError(Throwable t) {}
+ ActionResult.Builder result = ActionResult.newBuilder();
+ client.uploadAllResults(execRoot, ImmutableList.<Path>of(fooFile, barFile), result);
+ ActionResult.Builder expectedResult = ActionResult.newBuilder();
+ expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
+ expectedResult
+ .addOutputFilesBuilder()
+ .setPath("bar")
+ .setDigest(barDigest)
+ .setIsExecutable(true);
+ assertThat(result.build()).isEqualTo(expectedResult.build());
+ }
- @Override
- public void onCompleted() {
- responseObserver.onCompleted();
- }
- };
- }
+ @Test
+ public void testUploadAllResultsCacheMisses() throws Exception {
+ final GrpcActionCache client = newClient();
+ final Digest fooDigest =
+ fakeFileCache.createScratchInput(ActionInputHelper.fromPath("a/foo"), "xyz");
+ final Digest barDigest =
+ fakeFileCache.createScratchInput(ActionInputHelper.fromPath("bar"), "x");
+ final Path fooFile = execRoot.getRelative("a/foo");
+ final Path barFile = execRoot.getRelative("bar");
+ barFile.setExecutable(true);
+ serviceRegistry.addService(
+ new ContentAddressableStorageImplBase() {
+ @Override
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ assertThat(request)
+ .isEqualTo(
+ FindMissingBlobsRequest.newBuilder()
+ .addBlobDigests(fooDigest)
+ .addBlobDigests(barDigest)
+ .build());
+ // Both are missing.
+ responseObserver.onNext(
+ FindMissingBlobsResponse.newBuilder()
+ .addMissingBlobDigests(fooDigest)
+ .addMissingBlobDigests(barDigest)
+ .build());
+ responseObserver.onCompleted();
+ }
+ });
+ ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class);
+ serviceRegistry.addService(mockByteStreamImpl);
+ when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject()))
+ .thenAnswer(blobChunkedWriteAnswer("xyz", 3))
+ .thenAnswer(blobChunkedWriteAnswer("x", 1));
+
+ ActionResult.Builder result = ActionResult.newBuilder();
+ client.uploadAllResults(execRoot, ImmutableList.<Path>of(fooFile, barFile), result);
+ ActionResult.Builder expectedResult = ActionResult.newBuilder();
+ expectedResult.addOutputFilesBuilder().setPath("a/foo").setDigest(fooDigest);
+ expectedResult
+ .addOutputFilesBuilder()
+ .setPath("bar")
+ .setDigest(barDigest)
+ .setIsExecutable(true);
+ assertThat(result.build()).isEqualTo(expectedResult.build());
}
}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
index fc195ab3de..c648fae09f 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutionClientTest.java
@@ -14,12 +14,16 @@
package com.google.devtools.build.lib.remote;
import static com.google.common.truth.Truth.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.verify;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
+import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
+import com.google.bytestream.ByteStreamProto.WriteRequest;
+import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.ActionInputHelper;
@@ -32,14 +36,6 @@ import com.google.devtools.build.lib.exec.SpawnResult;
import com.google.devtools.build.lib.exec.SpawnRunner.ProgressStatus;
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionPolicy;
import com.google.devtools.build.lib.exec.util.FakeOwner;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
@@ -47,118 +43,154 @@ import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem;
import com.google.devtools.common.options.Options;
+import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.Command;
+import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
+import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
+import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
+import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
+import com.google.longrunning.Operation;
+import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
+import com.google.rpc.Code;
+import com.google.rpc.Status;
+import io.grpc.Channel;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.protobuf.StatusProto;
+import io.grpc.stub.StreamObserver;
+import io.grpc.util.MutableHandlerRegistry;
import java.io.IOException;
-import java.nio.charset.StandardCharsets;
import java.util.Collection;
+import java.util.Set;
import java.util.SortedMap;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
/** Tests for {@link RemoteSpawnRunner} in combination with {@link GrpcRemoteExecutor}. */
@RunWith(JUnit4.class)
public class GrpcRemoteExecutionClientTest {
- private static final ArtifactExpander SIMPLE_ARTIFACT_EXPANDER = new ArtifactExpander() {
- @Override
- public void expand(Artifact artifact, Collection<? super Artifact> output) {
- output.add(artifact);
- }
- };
+ private static final ArtifactExpander SIMPLE_ARTIFACT_EXPANDER =
+ new ArtifactExpander() {
+ @Override
+ public void expand(Artifact artifact, Collection<? super Artifact> output) {
+ output.add(artifact);
+ }
+ };
+ private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry();
private FileSystem fs;
private Path execRoot;
private SimpleSpawn simpleSpawn;
private FakeActionInputFileCache fakeFileCache;
-
+ private Digest inputDigest;
+ private RemoteSpawnRunner client;
private FileOutErr outErr;
- private long timeoutMillis = 0;
-
- private final SpawnExecutionPolicy simplePolicy = new SpawnExecutionPolicy() {
- @Override
- public void lockOutputFiles() throws InterruptedException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public ActionInputFileCache getActionInputFileCache() {
- return fakeFileCache;
- }
-
- @Override
- public long getTimeoutMillis() {
- return timeoutMillis;
- }
-
- @Override
- public FileOutErr getFileOutErr() {
- return outErr;
- }
-
- @Override
- public SortedMap<PathFragment, ActionInput> getInputMapping() throws IOException {
- return new SpawnInputExpander(/*strict*/false)
- .getInputMapping(simpleSpawn, SIMPLE_ARTIFACT_EXPANDER, fakeFileCache, "workspace");
- }
-
- @Override
- public void report(ProgressStatus state) {
- // TODO(ulfjack): Test that the right calls are made.
- }
- };
+ private Server fakeServer;
+
+ private final SpawnExecutionPolicy simplePolicy =
+ new SpawnExecutionPolicy() {
+ @Override
+ public void lockOutputFiles() throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ActionInputFileCache getActionInputFileCache() {
+ return fakeFileCache;
+ }
+
+ @Override
+ public long getTimeoutMillis() {
+ return 0;
+ }
+
+ @Override
+ public FileOutErr getFileOutErr() {
+ return outErr;
+ }
+
+ @Override
+ public SortedMap<PathFragment, ActionInput> getInputMapping() throws IOException {
+ return new SpawnInputExpander(/*strict*/ false)
+ .getInputMapping(simpleSpawn, SIMPLE_ARTIFACT_EXPANDER, fakeFileCache, "workspace");
+ }
+
+ @Override
+ public void report(ProgressStatus state) {
+ // TODO(ulfjack): Test that the right calls are made.
+ }
+ };
@Before
public final void setUp() throws Exception {
+ String fakeServerName = "fake server for " + getClass();
+ // Use a mutable service registry for later registering the service impl for each test case.
+ fakeServer =
+ InProcessServerBuilder.forName(fakeServerName)
+ .fallbackHandlerRegistry(serviceRegistry)
+ .directExecutor()
+ .build()
+ .start();
+
+ Chunker.setDefaultChunkSizeForTesting(1000); // Enough for everything to be one chunk.
fs = new InMemoryFileSystem();
execRoot = fs.getPath("/exec/root");
FileSystemUtils.createDirectoryAndParents(execRoot);
fakeFileCache = new FakeActionInputFileCache(execRoot);
- simpleSpawn = new SimpleSpawn(
- new FakeOwner("Mnemonic", "Progress Message"),
- ImmutableList.of("/bin/echo", "Hi!"),
- ImmutableMap.of("VARIABLE", "value"),
- /*executionInfo=*/ImmutableMap.<String, String>of(),
- /*inputs=*/ImmutableList.of(ActionInputHelper.fromPath("input")),
- /*outputs=*/ImmutableList.<ActionInput>of(),
- ResourceSet.ZERO
- );
+ simpleSpawn =
+ new SimpleSpawn(
+ new FakeOwner("Mnemonic", "Progress Message"),
+ ImmutableList.of("/bin/echo", "Hi!"),
+ ImmutableMap.of("VARIABLE", "value"),
+ /*executionInfo=*/ ImmutableMap.<String, String>of(),
+ /*inputs=*/ ImmutableList.of(ActionInputHelper.fromPath("input")),
+ /*outputs=*/ ImmutableList.<ActionInput>of(),
+ ResourceSet.ZERO);
Path stdout = fs.getPath("/tmp/stdout");
Path stderr = fs.getPath("/tmp/stderr");
FileSystemUtils.createDirectoryAndParents(stdout.getParentDirectory());
FileSystemUtils.createDirectoryAndParents(stderr.getParentDirectory());
outErr = new FileOutErr(stdout, stderr);
+ RemoteOptions options = Options.getDefaults(RemoteOptions.class);
+ Channel channel = InProcessChannelBuilder.forName(fakeServerName).directExecutor().build();
+ GrpcRemoteExecutor executor = new GrpcRemoteExecutor(channel, ChannelOptions.DEFAULT, options);
+ GrpcActionCache remoteCache = new GrpcActionCache(channel, ChannelOptions.DEFAULT, options);
+ client = new RemoteSpawnRunner(execRoot, options, executor, remoteCache);
+ inputDigest = fakeFileCache.createScratchInput(simpleSpawn.getInputFiles().get(0), "xyz");
}
- private void scratch(ActionInput input, String content) throws IOException {
- Path inputFile = execRoot.getRelative(input.getExecPath());
- FileSystemUtils.writeContentAsLatin1(inputFile, content);
- fakeFileCache.setDigest(
- simpleSpawn.getInputFiles().get(0), ByteString.copyFrom(inputFile.getSHA1Digest()));
+ @After
+ public void tearDown() throws Exception {
+ fakeServer.shutdownNow();
}
@Test
public void cacheHit() throws Exception {
- GrpcCasInterface casIface = Mockito.mock(GrpcCasInterface.class);
- GrpcExecutionCacheInterface cacheIface = Mockito.mock(GrpcExecutionCacheInterface.class);
- GrpcExecutionInterface executionIface = Mockito.mock(GrpcExecutionInterface.class);
- RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- GrpcRemoteExecutor executor =
- new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface);
- RemoteSpawnRunner client = new RemoteSpawnRunner(execRoot, options, executor);
-
- scratch(simpleSpawn.getInputFiles().get(0), "xyz");
-
- ExecutionCacheReply reply = ExecutionCacheReply.newBuilder()
- .setStatus(ExecutionCacheStatus.newBuilder().setSucceeded(true))
- .setResult(ActionResult.newBuilder().setReturnCode(0))
- .build();
- when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply);
+ serviceRegistry.addService(
+ new ActionCacheImplBase() {
+ @Override
+ public void getActionResult(
+ GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
+ responseObserver.onNext(ActionResult.getDefaultInstance());
+ responseObserver.onCompleted();
+ }
+ });
SpawnResult result = client.exec(simpleSpawn, simplePolicy);
- verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class));
assertThat(result.setupSuccess()).isTrue();
assertThat(result.exitCode()).isEqualTo(0);
assertThat(outErr.hasRecordedOutput()).isFalse();
@@ -167,69 +199,154 @@ public class GrpcRemoteExecutionClientTest {
@Test
public void cacheHitWithOutput() throws Exception {
- InMemoryCas casIface = new InMemoryCas();
- GrpcExecutionCacheInterface cacheIface = Mockito.mock(GrpcExecutionCacheInterface.class);
- GrpcExecutionInterface executionIface = Mockito.mock(GrpcExecutionInterface.class);
- RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- GrpcRemoteExecutor executor =
- new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface);
- RemoteSpawnRunner client = new RemoteSpawnRunner(execRoot, options, executor);
-
- scratch(simpleSpawn.getInputFiles().get(0), "xyz");
- byte[] cacheStdOut = "stdout".getBytes(StandardCharsets.UTF_8);
- byte[] cacheStdErr = "stderr".getBytes(StandardCharsets.UTF_8);
- ContentDigest stdOutDigest = casIface.put(cacheStdOut);
- ContentDigest stdErrDigest = casIface.put(cacheStdErr);
-
- ExecutionCacheReply reply = ExecutionCacheReply.newBuilder()
- .setStatus(ExecutionCacheStatus.newBuilder().setSucceeded(true))
- .setResult(ActionResult.newBuilder()
- .setReturnCode(0)
- .setStdoutDigest(stdOutDigest)
- .setStderrDigest(stdErrDigest))
- .build();
- when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply);
+ final Digest stdOutDigest = Digests.computeDigestUtf8("stdout");
+ final Digest stdErrDigest = Digests.computeDigestUtf8("stderr");
+ serviceRegistry.addService(
+ new ActionCacheImplBase() {
+ @Override
+ public void getActionResult(
+ GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
+ responseObserver.onNext(
+ ActionResult.newBuilder()
+ .setStdoutDigest(stdOutDigest)
+ .setStderrDigest(stdErrDigest)
+ .build());
+ responseObserver.onCompleted();
+ }
+ });
+ serviceRegistry.addService(
+ new FakeImmutableCacheByteStreamImpl(stdOutDigest, "stdout", stdErrDigest, "stderr"));
+
+ SpawnResult result = client.exec(simpleSpawn, simplePolicy);
+ assertThat(result.setupSuccess()).isTrue();
+ assertThat(result.exitCode()).isEqualTo(0);
+ assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
+ assertThat(outErr.errAsLatin1()).isEqualTo("stderr");
+ }
+
+ @Test
+ public void cacheHitWithInlineOutput() throws Exception {
+ serviceRegistry.addService(
+ new ActionCacheImplBase() {
+ @Override
+ public void getActionResult(
+ GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
+ responseObserver.onNext(
+ ActionResult.newBuilder()
+ .setStdoutRaw(ByteString.copyFromUtf8("stdout"))
+ .setStderrRaw(ByteString.copyFromUtf8("stderr"))
+ .build());
+ responseObserver.onCompleted();
+ }
+ });
SpawnResult result = client.exec(simpleSpawn, simplePolicy);
- verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class));
assertThat(result.setupSuccess()).isTrue();
assertThat(result.exitCode()).isEqualTo(0);
assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
assertThat(outErr.errAsLatin1()).isEqualTo("stderr");
}
+ private Answer<StreamObserver<WriteRequest>> blobWriteAnswer(final byte[] data) {
+ final Digest digest = Digests.computeDigest(data);
+ return new Answer<StreamObserver<WriteRequest>>() {
+ @Override
+ public StreamObserver<WriteRequest> answer(InvocationOnMock invocation) {
+ @SuppressWarnings("unchecked")
+ final StreamObserver<WriteResponse> responseObserver =
+ (StreamObserver<WriteResponse>) invocation.getArguments()[0];
+ return new StreamObserver<WriteRequest>() {
+ @Override
+ public void onNext(WriteRequest request) {
+ assertThat(request.getResourceName()).contains(digest.getHash());
+ assertThat(request.getFinishWrite()).isTrue();
+ assertThat(request.getData().toByteArray()).isEqualTo(data);
+ responseObserver.onNext(
+ WriteResponse.newBuilder().setCommittedSize(request.getData().size()).build());
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ fail("An error occurred: " + t);
+ }
+ };
+ }
+ };
+ }
+
@Test
public void remotelyExecute() throws Exception {
- InMemoryCas casIface = new InMemoryCas();
- GrpcExecutionCacheInterface cacheIface = Mockito.mock(GrpcExecutionCacheInterface.class);
- GrpcExecutionInterface executionIface = Mockito.mock(GrpcExecutionInterface.class);
- RemoteOptions options = Options.getDefaults(RemoteOptions.class);
- GrpcRemoteExecutor executor =
- new GrpcRemoteExecutor(options, casIface, cacheIface, executionIface);
- RemoteSpawnRunner client = new RemoteSpawnRunner(execRoot, options, executor);
-
- scratch(simpleSpawn.getInputFiles().get(0), "xyz");
- byte[] cacheStdOut = "stdout".getBytes(StandardCharsets.UTF_8);
- byte[] cacheStdErr = "stderr".getBytes(StandardCharsets.UTF_8);
- ContentDigest stdOutDigest = casIface.put(cacheStdOut);
- ContentDigest stdErrDigest = casIface.put(cacheStdErr);
-
- ExecutionCacheReply reply = ExecutionCacheReply.newBuilder()
- .setStatus(ExecutionCacheStatus.newBuilder().setSucceeded(true))
- .build();
- when(cacheIface.getCachedResult(any(ExecutionCacheRequest.class))).thenReturn(reply);
-
- when(executionIface.execute(any(ExecuteRequest.class))).thenReturn(ImmutableList.of(
- ExecuteReply.newBuilder()
- .setStatus(ExecutionStatus.newBuilder().setSucceeded(true))
- .setResult(ActionResult.newBuilder()
- .setReturnCode(0)
- .setStdoutDigest(stdOutDigest)
- .setStderrDigest(stdErrDigest))
- .build()).iterator());
+ // getActionResult mock returns null by default, meaning a cache miss.
+ serviceRegistry.addService(
+ new ActionCacheImplBase() {
+ @Override
+ public void getActionResult(
+ GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
+ responseObserver.onError(
+ StatusProto.toStatusRuntimeException(
+ Status.newBuilder().setCode(Code.NOT_FOUND.getNumber()).build()));
+ }
+ });
+ final ActionResult actionResult =
+ ActionResult.newBuilder()
+ .setStdoutRaw(ByteString.copyFromUtf8("stdout"))
+ .setStderrRaw(ByteString.copyFromUtf8("stderr"))
+ .build();
+ serviceRegistry.addService(
+ new ExecutionImplBase() {
+ @Override
+ public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
+ responseObserver.onNext(
+ Operation.newBuilder()
+ .setDone(true)
+ .setResponse(
+ Any.pack(ExecuteResponse.newBuilder().setResult(actionResult).build()))
+ .build());
+ responseObserver.onCompleted();
+ }
+ });
+ final Command command =
+ Command.newBuilder()
+ .addAllArguments(ImmutableList.of("/bin/echo", "Hi!"))
+ .addEnvironmentVariables(
+ Command.EnvironmentVariable.newBuilder()
+ .setName("VARIABLE")
+ .setValue("value")
+ .build())
+ .build();
+ final Digest cmdDigest = Digests.computeDigest(command);
+ serviceRegistry.addService(
+ new ContentAddressableStorageImplBase() {
+ @Override
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ FindMissingBlobsResponse.Builder b = FindMissingBlobsResponse.newBuilder();
+ final Set<Digest> requested = ImmutableSet.copyOf(request.getBlobDigestsList());
+ if (requested.contains(cmdDigest)) {
+ b.addMissingBlobDigests(cmdDigest);
+ } else if (requested.contains(inputDigest)) {
+ b.addMissingBlobDigests(inputDigest);
+ } else {
+ fail("Unexpected call to findMissingBlobs: " + request);
+ }
+ responseObserver.onNext(b.build());
+ responseObserver.onCompleted();
+ }
+ });
+
+ ByteStreamImplBase mockByteStreamImpl = Mockito.mock(ByteStreamImplBase.class);
+ when(mockByteStreamImpl.write(Mockito.<StreamObserver<WriteResponse>>anyObject()))
+ .thenAnswer(blobWriteAnswer(command.toByteArray()))
+ .thenAnswer(blobWriteAnswer("xyz".getBytes(UTF_8)));
+ serviceRegistry.addService(mockByteStreamImpl);
SpawnResult result = client.exec(simpleSpawn, simplePolicy);
- verify(cacheIface).getCachedResult(any(ExecutionCacheRequest.class));
assertThat(result.setupSuccess()).isTrue();
assertThat(result.exitCode()).isEqualTo(0);
assertThat(outErr.outAsLatin1()).isEqualTo("stdout");
diff --git a/src/test/java/com/google/devtools/build/lib/remote/InMemoryCas.java b/src/test/java/com/google/devtools/build/lib/remote/InMemoryCas.java
deleted file mode 100644
index cba62a536f..0000000000
--- a/src/test/java/com/google/devtools/build/lib/remote/InMemoryCas.java
+++ /dev/null
@@ -1,143 +0,0 @@
-// Copyright 2017 The Bazel Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package com.google.devtools.build.lib.remote;
-
-import com.google.common.base.Preconditions;
-import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.protobuf.ByteString;
-import io.grpc.stub.StreamObserver;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/** An in-memory implementation of GrpcCasInterface. */
-final class InMemoryCas implements GrpcCasInterface {
- private final Map<ByteString, ByteString> content = new HashMap<>();
-
- public ContentDigest put(byte[] data) {
- ContentDigest digest = ContentDigests.computeDigest(data);
- ByteString key = digest.getDigest();
- ByteString value = ByteString.copyFrom(data);
- content.put(key, value);
- return digest;
- }
-
- @Override
- public CasLookupReply lookup(CasLookupRequest request) {
- CasStatus.Builder result = CasStatus.newBuilder();
- for (ContentDigest digest : request.getDigestList()) {
- ByteString key = digest.getDigest();
- if (!content.containsKey(key)) {
- result.addMissingDigest(digest);
- }
- }
- if (result.getMissingDigestCount() != 0) {
- result.setError(CasStatus.ErrorCode.MISSING_DIGEST);
- } else {
- result.setSucceeded(true);
- }
- return CasLookupReply.newBuilder().setStatus(result).build();
- }
-
- @Override
- public CasUploadTreeMetadataReply uploadTreeMetadata(CasUploadTreeMetadataRequest request) {
- return CasUploadTreeMetadataReply.newBuilder()
- .setStatus(CasStatus.newBuilder().setSucceeded(true))
- .build();
- }
-
- @Override
- public CasDownloadTreeMetadataReply downloadTreeMetadata(
- CasDownloadTreeMetadataRequest request) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator<CasDownloadReply> downloadBlob(CasDownloadBlobRequest request) {
- List<CasDownloadReply> result = new ArrayList<>();
- for (ContentDigest digest : request.getDigestList()) {
- CasDownloadReply.Builder builder = CasDownloadReply.newBuilder();
- ByteString item = content.get(digest.getDigest());
- if (item != null) {
- builder.setStatus(CasStatus.newBuilder().setSucceeded(true));
- builder.setData(BlobChunk.newBuilder().setData(item).setDigest(digest));
- } else {
- throw new IllegalStateException();
- }
- result.add(builder.build());
- }
- return result.iterator();
- }
-
- @Override
- public StreamObserver<CasUploadBlobRequest> uploadBlobAsync(
- final StreamObserver<CasUploadBlobReply> responseObserver) {
- return new StreamObserver<CasUploadBlobRequest>() {
- private ContentDigest digest;
- private ByteArrayOutputStream current;
-
- @Override
- public void onNext(CasUploadBlobRequest value) {
- BlobChunk chunk = value.getData();
- if (chunk.hasDigest()) {
- Preconditions.checkState(digest == null);
- digest = chunk.getDigest();
- current = new ByteArrayOutputStream();
- }
- try {
- current.write(chunk.getData().toByteArray());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- responseObserver.onNext(
- CasUploadBlobReply.newBuilder()
- .setStatus(CasStatus.newBuilder().setSucceeded(true))
- .build());
- }
-
- @Override
- public void onError(Throwable t) {
- throw new RuntimeException(t);
- }
-
- @Override
- public void onCompleted() {
- ContentDigest check = ContentDigests.computeDigest(current.toByteArray());
- Preconditions.checkState(check.equals(digest), "%s != %s", digest, check);
- ByteString key = digest.getDigest();
- ByteString value = ByteString.copyFrom(current.toByteArray());
- digest = null;
- current = null;
- content.put(key, value);
- responseObserver.onCompleted();
- }
- };
- }
-} \ No newline at end of file
diff --git a/src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java b/src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java
index f9ef9111a6..94374f6364 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java
+++ b/src/test/java/com/google/devtools/build/lib/remote/TreeNodeRepositoryTest.java
@@ -22,14 +22,14 @@ import com.google.devtools.build.lib.actions.ActionInputFileCache;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.Root;
import com.google.devtools.build.lib.exec.SingleBuildFileCache;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
import com.google.devtools.build.lib.remote.TreeNodeRepository.TreeNode;
import com.google.devtools.build.lib.testutil.Scratch;
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.FileSystem.HashFunction;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.Directory;
import java.util.ArrayList;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -54,8 +54,8 @@ public class TreeNodeRepositoryTest {
}
private TreeNodeRepository createTestTreeNodeRepository() {
- ActionInputFileCache inputFileCache = new SingleBuildFileCache(
- rootPath.getPathString(), scratch.getFileSystem());
+ ActionInputFileCache inputFileCache =
+ new SingleBuildFileCache(rootPath.getPathString(), scratch.getFileSystem());
return new TreeNodeRepository(rootPath, inputFileCache);
}
@@ -86,34 +86,26 @@ public class TreeNodeRepositoryTest {
TreeNode barNode = aNode.getChildEntries().get(0).getChild();
repo.computeMerkleDigests(root);
- ImmutableCollection<ContentDigest> digests = repo.getAllDigests(root);
- ContentDigest rootDigest = repo.getMerkleDigest(root);
- ContentDigest aDigest = repo.getMerkleDigest(aNode);
- ContentDigest fooDigest = repo.getMerkleDigest(fooNode);
- ContentDigest fooContentsDigest = ContentDigests.computeDigest(foo.getPath());
- ContentDigest barDigest = repo.getMerkleDigest(barNode);
- ContentDigest barContentsDigest = ContentDigests.computeDigest(bar.getPath());
- assertThat(digests)
- .containsExactly(
- rootDigest, aDigest, barDigest, barContentsDigest, fooDigest, fooContentsDigest);
+ ImmutableCollection<Digest> digests = repo.getAllDigests(root);
+ Digest rootDigest = repo.getMerkleDigest(root);
+ Digest aDigest = repo.getMerkleDigest(aNode);
+ Digest fooDigest = repo.getMerkleDigest(fooNode); // The contents digest.
+ Digest barDigest = repo.getMerkleDigest(barNode);
+ assertThat(digests).containsExactly(rootDigest, aDigest, barDigest, fooDigest);
- ArrayList<FileNode> fileNodes = new ArrayList<>();
+ ArrayList<Directory> directories = new ArrayList<>();
ArrayList<ActionInput> actionInputs = new ArrayList<>();
- repo.getDataFromDigests(digests, actionInputs, fileNodes);
+ repo.getDataFromDigests(digests, actionInputs, directories);
assertThat(actionInputs).containsExactly(bar, foo);
- assertThat(fileNodes).hasSize(4);
- FileNode rootFileNode = fileNodes.get(0);
- assertThat(rootFileNode.getChild(0).getPath()).isEqualTo("a");
- assertThat(rootFileNode.getChild(0).getDigest()).isEqualTo(aDigest);
- FileNode aFileNode = fileNodes.get(1);
- assertThat(aFileNode.getChild(0).getPath()).isEqualTo("bar");
- assertThat(aFileNode.getChild(0).getDigest()).isEqualTo(barDigest);
- assertThat(aFileNode.getChild(1).getPath()).isEqualTo("foo");
- assertThat(aFileNode.getChild(1).getDigest()).isEqualTo(fooDigest);
- FileNode barFileNode = fileNodes.get(2);
- assertThat(barFileNode.getFileMetadata().getDigest()).isEqualTo(barContentsDigest);
- FileNode fooFileNode = fileNodes.get(3);
- assertThat(fooFileNode.getFileMetadata().getDigest()).isEqualTo(fooContentsDigest);
+ assertThat(directories).hasSize(2);
+ Directory rootDirectory = directories.get(0);
+ assertThat(rootDirectory.getDirectories(0).getName()).isEqualTo("a");
+ assertThat(rootDirectory.getDirectories(0).getDigest()).isEqualTo(aDigest);
+ Directory aDirectory = directories.get(1);
+ assertThat(aDirectory.getFiles(0).getName()).isEqualTo("bar");
+ assertThat(aDirectory.getFiles(0).getDigest()).isEqualTo(barDigest);
+ assertThat(aDirectory.getFiles(1).getName()).isEqualTo("foo");
+ assertThat(aDirectory.getFiles(1).getDigest()).isEqualTo(fooDigest);
}
@Test
@@ -124,8 +116,8 @@ public class TreeNodeRepositoryTest {
TreeNodeRepository repo = createTestTreeNodeRepository();
TreeNode root = repo.buildFromActionInputs(ImmutableList.<ActionInput>of(foo1, foo2, foo3));
repo.computeMerkleDigests(root);
- // Reusing same node for the "foo" subtree: only need the root, root child, foo, and contents:
- assertThat(repo.getAllDigests(root)).hasSize(4);
+ // Reusing same node for the "foo" subtree: only need the root, root child, and foo contents:
+ assertThat(repo.getAllDigests(root)).hasSize(3);
}
@Test
@@ -141,16 +133,12 @@ public class TreeNodeRepositoryTest {
TreeNode aNode = root.getChildEntries().get(0).getChild();
TreeNode fooNode = aNode.getChildEntries().get(1).getChild(); // foo > bar in sort order!
TreeNode barNode = aNode.getChildEntries().get(0).getChild();
- ImmutableCollection<ContentDigest> digests = repo.getAllDigests(root);
- ContentDigest rootDigest = repo.getMerkleDigest(root);
- ContentDigest aDigest = repo.getMerkleDigest(aNode);
- ContentDigest fooDigest = repo.getMerkleDigest(fooNode);
- ContentDigest fooContentsDigest = ContentDigests.computeDigest(foo.getPath());
- ContentDigest barDigest = repo.getMerkleDigest(barNode);
- ContentDigest barContentsDigest = ContentDigests.computeDigest(new byte[0]);
- assertThat(digests)
- .containsExactly(
- rootDigest, aDigest, barDigest, barContentsDigest, fooDigest, fooContentsDigest);
+ ImmutableCollection<Digest> digests = repo.getAllDigests(root);
+ Digest rootDigest = repo.getMerkleDigest(root);
+ Digest aDigest = repo.getMerkleDigest(aNode);
+ Digest fooDigest = repo.getMerkleDigest(fooNode);
+ Digest barDigest = repo.getMerkleDigest(barNode);
+ assertThat(digests).containsExactly(rootDigest, aDigest, barDigest, fooDigest);
}
@Test
diff --git a/src/test/shell/bazel/remote_execution_test.sh b/src/test/shell/bazel/remote_execution_test.sh
index 6ed3101589..37515bce29 100755
--- a/src/test/shell/bazel/remote_execution_test.sh
+++ b/src/test/shell/bazel/remote_execution_test.sh
@@ -30,7 +30,6 @@ function set_up() {
${bazel_data}/src/tools/remote_worker/remote_worker \
--work_path=${work_path} \
--listen_port=${worker_port} \
- --grpc_max_chunk_size_bytes=120000000 \
--hazelcast_standalone_listen_port=${hazelcast_port} \
--pid_file=${pid_file} >& $TEST_log &
local wait_seconds=0
@@ -187,7 +186,6 @@ EOF
bazel --host_jvm_args=-Dbazel.DigestFunction=SHA1 build \
--spawn_strategy=remote \
--noremote_local_fallback \
- --grpc_max_chunk_size_bytes=120000000 \
--remote_executor=localhost:${worker_port} \
--remote_cache=localhost:${worker_port} \
//a:large_output >& $TEST_log \
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD
index a91e5e5d73..ddbb5ede52 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/BUILD
@@ -25,13 +25,19 @@ java_library(
"//src/main/java/com/google/devtools/build/lib:vfs",
"//src/main/java/com/google/devtools/build/lib/remote",
"//src/main/java/com/google/devtools/common/options",
- "//src/main/protobuf:remote_protocol_java_grpc",
- "//src/main/protobuf:remote_protocol_java_proto",
"//third_party:guava",
"//third_party:hazelcast",
"//third_party:netty",
"//third_party/grpc:grpc-jar",
"//third_party/protobuf:protobuf_java",
"//third_party/protobuf:protobuf_java_util",
+ "@googleapis//:google_bytestream_bytestream_java_grpc",
+ "@googleapis//:google_bytestream_bytestream_java_proto",
+ "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_grpc",
+ "@googleapis//:google_devtools_remoteexecution_v1test_remote_execution_java_proto",
+ "@googleapis//:google_longrunning_operations_java_proto",
+ "@googleapis//:google_rpc_code_java_proto",
+ "@googleapis//:google_rpc_error_details_java_proto",
+ "@googleapis//:google_rpc_status_java_proto",
],
)
diff --git a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
index 8558be720f..0eee6695e1 100644
--- a/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
+++ b/src/tools/remote_worker/src/main/java/com/google/devtools/build/remote/RemoteWorker.java
@@ -14,43 +14,16 @@
package com.google.devtools.build.remote;
-import com.google.common.collect.ImmutableList;
-import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions;
-import com.google.devtools.build.lib.exec.SpawnResult.Status;
+import com.google.bytestream.ByteStreamGrpc.ByteStreamImplBase;
+import com.google.bytestream.ByteStreamProto.ReadRequest;
+import com.google.bytestream.ByteStreamProto.ReadResponse;
+import com.google.bytestream.ByteStreamProto.WriteRequest;
+import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.devtools.build.lib.remote.CacheNotFoundException;
-import com.google.devtools.build.lib.remote.CasServiceGrpc.CasServiceImplBase;
-import com.google.devtools.build.lib.remote.ChannelOptions;
import com.google.devtools.build.lib.remote.Chunker;
-import com.google.devtools.build.lib.remote.ContentDigests;
-import com.google.devtools.build.lib.remote.ContentDigests.ActionKey;
-import com.google.devtools.build.lib.remote.ExecuteServiceGrpc.ExecuteServiceImplBase;
-import com.google.devtools.build.lib.remote.ExecutionCacheServiceGrpc.ExecutionCacheServiceImplBase;
+import com.google.devtools.build.lib.remote.Digests;
+import com.google.devtools.build.lib.remote.Digests.ActionKey;
import com.google.devtools.build.lib.remote.RemoteOptions;
-import com.google.devtools.build.lib.remote.RemoteProtocol;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Action;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ActionResult;
-import com.google.devtools.build.lib.remote.RemoteProtocol.BlobChunk;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasDownloadReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasLookupRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadBlobRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.CasUploadTreeMetadataRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Command.EnvironmentEntry;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ContentDigest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecuteRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetReply;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheSetRequest;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionCacheStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.ExecutionStatus;
-import com.google.devtools.build.lib.remote.RemoteProtocol.FileNode;
-import com.google.devtools.build.lib.remote.RemoteProtocol.Platform;
import com.google.devtools.build.lib.remote.SimpleBlobStore;
import com.google.devtools.build.lib.remote.SimpleBlobStoreActionCache;
import com.google.devtools.build.lib.remote.SimpleBlobStoreFactory;
@@ -61,23 +34,48 @@ import com.google.devtools.build.lib.shell.CommandResult;
import com.google.devtools.build.lib.shell.TimeoutKillableObserver;
import com.google.devtools.build.lib.unix.UnixFileSystem;
import com.google.devtools.build.lib.util.OS;
-import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.util.ProcessUtils;
import com.google.devtools.build.lib.vfs.FileSystem;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.JavaIoFileSystem;
import com.google.devtools.build.lib.vfs.Path;
-import com.google.devtools.common.options.Options;
import com.google.devtools.common.options.OptionsParser;
+import com.google.devtools.remoteexecution.v1test.Action;
+import com.google.devtools.remoteexecution.v1test.ActionCacheGrpc.ActionCacheImplBase;
+import com.google.devtools.remoteexecution.v1test.ActionResult;
+import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsRequest;
+import com.google.devtools.remoteexecution.v1test.BatchUpdateBlobsResponse;
+import com.google.devtools.remoteexecution.v1test.Command.EnvironmentVariable;
+import com.google.devtools.remoteexecution.v1test.ContentAddressableStorageGrpc.ContentAddressableStorageImplBase;
+import com.google.devtools.remoteexecution.v1test.Digest;
+import com.google.devtools.remoteexecution.v1test.ExecuteRequest;
+import com.google.devtools.remoteexecution.v1test.ExecuteResponse;
+import com.google.devtools.remoteexecution.v1test.ExecutionGrpc.ExecutionImplBase;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsRequest;
+import com.google.devtools.remoteexecution.v1test.FindMissingBlobsResponse;
+import com.google.devtools.remoteexecution.v1test.GetActionResultRequest;
+import com.google.devtools.remoteexecution.v1test.Platform;
+import com.google.devtools.remoteexecution.v1test.UpdateActionResultRequest;
+import com.google.devtools.remoteexecution.v1test.UpdateBlobRequest;
+import com.google.longrunning.Operation;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.util.Durations;
+import com.google.rpc.BadRequest;
+import com.google.rpc.BadRequest.FieldViolation;
+import com.google.rpc.Code;
+import com.google.rpc.Status;
import io.grpc.Server;
+import io.grpc.StatusRuntimeException;
import io.grpc.netty.NettyServerBuilder;
+import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
+import java.io.StringWriter;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -88,6 +86,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.annotation.Nullable;
/**
* Implements a remote worker that accepts work items as protobufs. The server implementation is
@@ -96,13 +95,10 @@ import java.util.logging.Logger;
public class RemoteWorker {
private static final Logger LOG = Logger.getLogger(RemoteWorker.class.getName());
private static final boolean LOG_FINER = LOG.isLoggable(Level.FINER);
-
- private static final int LOCAL_EXEC_ERROR = -1;
- private static final int SIGALRM_EXIT_CODE = /*SIGNAL_BASE=*/128 + /*SIGALRM=*/14;
-
- private final CasServiceImplBase casServer;
- private final ExecuteServiceImplBase execServer;
- private final ExecutionCacheServiceImplBase execCacheServer;
+ private final ContentAddressableStorageImplBase casServer;
+ private final ByteStreamImplBase bsServer;
+ private final ExecutionImplBase execServer;
+ private final ActionCacheImplBase actionCacheServer;
private final SimpleBlobStoreActionCache cache;
private final RemoteWorkerOptions workerOptions;
private final RemoteOptions remoteOptions;
@@ -123,16 +119,16 @@ public class RemoteWorker {
execServer = null;
}
casServer = new CasServer();
- execCacheServer = new ExecutionCacheServer();
+ bsServer = new ByteStreamServer();
+ actionCacheServer = new ActionCacheServer();
}
public Server startServer() throws IOException {
NettyServerBuilder b =
NettyServerBuilder.forPort(workerOptions.listenPort)
- .maxMessageSize(ChannelOptions.create(Options.getDefaults(AuthAndTLSOptions.class),
- remoteOptions.grpcMaxChunkSizeBytes).maxMessageSize())
.addService(casServer)
- .addService(execCacheServer);
+ .addService(bsServer)
+ .addService(actionCacheServer);
if (execServer != null) {
b.addService(execServer);
} else {
@@ -147,215 +143,258 @@ public class RemoteWorker {
return server;
}
- class CasServer extends CasServiceImplBase {
- private static final int MAX_MEMORY_KBYTES = 512 * 1024;
+ private static @Nullable Digest parseDigestFromResourceName(String resourceName) {
+ try {
+ String[] tokens = resourceName.split("/");
+ if (tokens.length < 2) {
+ return null;
+ }
+ String hash = tokens[tokens.length - 2];
+ long size = Long.parseLong(tokens[tokens.length - 1]);
+ return Digests.buildDigest(hash, size);
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+
+ private static StatusRuntimeException internalError(Exception e) {
+ return StatusProto.toStatusRuntimeException(internalErrorStatus(e));
+ }
+
+ private static com.google.rpc.Status internalErrorStatus(Exception e) {
+ return Status.newBuilder()
+ .setCode(Code.INTERNAL.getNumber())
+ .setMessage("Internal error: " + e)
+ .build();
+ }
+
+ private static StatusRuntimeException notFoundError(Digest digest) {
+ return StatusProto.toStatusRuntimeException(notFoundStatus(digest));
+ }
+
+ private static com.google.rpc.Status notFoundStatus(Digest digest) {
+ return Status.newBuilder()
+ .setCode(Code.NOT_FOUND.getNumber())
+ .setMessage("Digest not found:" + digest)
+ .build();
+ }
+
+ private static StatusRuntimeException invalidArgumentError(String field, String desc) {
+ return StatusProto.toStatusRuntimeException(invalidArgumentStatus(field, desc));
+ }
+
+ private static com.google.rpc.Status invalidArgumentStatus(String field, String desc) {
+ FieldViolation v = FieldViolation.newBuilder().setField(field).setDescription(desc).build();
+ return Status.newBuilder()
+ .setCode(Code.INVALID_ARGUMENT.getNumber())
+ .setMessage("invalid argument(s): " + field + ": " + desc)
+ .addDetails(Any.pack(BadRequest.newBuilder().addFieldViolations(v).build()))
+ .build();
+ }
+ class CasServer extends ContentAddressableStorageImplBase {
@Override
- public void lookup(CasLookupRequest request, StreamObserver<CasLookupReply> responseObserver) {
- CasLookupReply.Builder reply = CasLookupReply.newBuilder();
- CasStatus.Builder status = reply.getStatusBuilder();
- for (ContentDigest digest : request.getDigestList()) {
+ public void findMissingBlobs(
+ FindMissingBlobsRequest request,
+ StreamObserver<FindMissingBlobsResponse> responseObserver) {
+ FindMissingBlobsResponse.Builder response = FindMissingBlobsResponse.newBuilder();
+ for (Digest digest : request.getBlobDigestsList()) {
if (!cache.containsKey(digest)) {
- status.addMissingDigest(digest);
+ response.addMissingBlobDigests(digest);
}
}
- if (status.getMissingDigestCount() > 0) {
- status.setSucceeded(false);
- status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
- } else {
- status.setSucceeded(true);
- }
- responseObserver.onNext(reply.build());
+ responseObserver.onNext(response.build());
responseObserver.onCompleted();
}
@Override
- public void uploadTreeMetadata(
- CasUploadTreeMetadataRequest request,
- StreamObserver<CasUploadTreeMetadataReply> responseObserver) {
- try {
- for (FileNode treeNode : request.getTreeNodeList()) {
- cache.uploadBlob(treeNode.toByteArray());
+ public void batchUpdateBlobs(
+ BatchUpdateBlobsRequest request,
+ StreamObserver<BatchUpdateBlobsResponse> responseObserver) {
+ BatchUpdateBlobsResponse.Builder batchResponse = BatchUpdateBlobsResponse.newBuilder();
+ for (UpdateBlobRequest r : request.getRequestsList()) {
+ BatchUpdateBlobsResponse.Response.Builder resp = batchResponse.addResponsesBuilder();
+ try {
+ Digest digest = cache.uploadBlob(r.getData().toByteArray());
+ if (!r.getContentDigest().equals(digest)) {
+ String err =
+ "Upload digest " + r.getContentDigest() + " did not match data digest: " + digest;
+ resp.setStatus(invalidArgumentStatus("content_digest", err));
+ continue;
+ }
+ resp.getStatusBuilder().setCode(Code.OK.getNumber());
+ } catch (Exception e) {
+ resp.setStatus(internalErrorStatus(e));
}
- responseObserver.onNext(
- CasUploadTreeMetadataReply.newBuilder()
- .setStatus(CasStatus.newBuilder().setSucceeded(true))
- .build());
- } catch (Exception e) {
- LOG.warning("Request failed: " + e.toString());
- CasUploadTreeMetadataReply.Builder reply = CasUploadTreeMetadataReply.newBuilder();
- reply
- .getStatusBuilder()
- .setSucceeded(false)
- .setError(CasStatus.ErrorCode.UNKNOWN)
- .setErrorDetail(e.toString());
- responseObserver.onNext(reply.build());
- } finally {
- responseObserver.onCompleted();
}
+ responseObserver.onNext(batchResponse.build());
+ responseObserver.onCompleted();
}
+ }
+ class ByteStreamServer extends ByteStreamImplBase {
@Override
- public void downloadBlob(
- CasDownloadBlobRequest request, StreamObserver<CasDownloadReply> responseObserver) {
- CasDownloadReply.Builder reply = CasDownloadReply.newBuilder();
- CasStatus.Builder status = reply.getStatusBuilder();
- for (ContentDigest digest : request.getDigestList()) {
- if (!cache.containsKey(digest)) {
- status.addMissingDigest(digest);
- }
+ public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
+ Digest digest = parseDigestFromResourceName(request.getResourceName());
+ if (digest == null) {
+ responseObserver.onError(
+ invalidArgumentError(
+ "resource_name",
+ "Failed parsing digest from resource_name:" + request.getResourceName()));
}
- if (status.getMissingDigestCount() > 0) {
- status.setSucceeded(false);
- status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
- responseObserver.onNext(reply.build());
- responseObserver.onCompleted();
+ if (!cache.containsKey(digest)) {
+ responseObserver.onError(notFoundError(digest));
return;
}
- status.setSucceeded(true);
try {
- // This still relies on the total blob size to be small enough to fit in memory
- // simultaneously! TODO(olaola): refactor to fix this if the need arises.
- Chunker.Builder b = new Chunker.Builder().chunkSize(remoteOptions.grpcMaxChunkSizeBytes);
- for (ContentDigest digest : request.getDigestList()) {
- b.addInput(cache.downloadBlob(digest));
- }
- Chunker c = b.build();
+ // This still relies on the blob size to be small enough to fit in memory.
+ // TODO(olaola): refactor to fix this if the need arises.
+ Chunker c = Chunker.from(cache.downloadBlob(digest));
while (c.hasNext()) {
- reply.setData(c.next());
- responseObserver.onNext(reply.build());
- if (reply.hasStatus()) {
- reply.clearStatus(); // Only send status on first chunk.
- }
+ responseObserver.onNext(
+ ReadResponse.newBuilder().setData(ByteString.copyFrom(c.next().getData())).build());
}
- } catch (IOException e) {
- // This cannot happen, as we are chunking in-memory blobs.
- throw new RuntimeException("Internal error: " + e);
+ responseObserver.onCompleted();
} catch (CacheNotFoundException e) {
// This can only happen if an item gets evicted right after we check.
- reply.clearData();
- status.setSucceeded(false);
- status.setError(CasStatus.ErrorCode.MISSING_DIGEST);
- status.addMissingDigest(e.getMissingDigest());
- responseObserver.onNext(reply.build());
- } finally {
- responseObserver.onCompleted();
+ responseObserver.onError(notFoundError(digest));
+ } catch (Exception e) {
+ LOG.warning("Read request failed: " + e);
+ responseObserver.onError(internalError(e));
}
}
@Override
- public StreamObserver<CasUploadBlobRequest> uploadBlob(
- final StreamObserver<CasUploadBlobReply> responseObserver) {
- return new StreamObserver<CasUploadBlobRequest>() {
+ public StreamObserver<WriteRequest> write(
+ final StreamObserver<WriteResponse> responseObserver) {
+ return new StreamObserver<WriteRequest>() {
byte[] blob = null;
- ContentDigest digest = null;
+ Digest digest = null;
long offset = 0;
+ String resourceName = null;
+ boolean closed = false;
@Override
- public void onNext(CasUploadBlobRequest request) {
- BlobChunk chunk = request.getData();
- try {
- if (chunk.hasDigest()) {
- // Check if the previous chunk was really done.
- Preconditions.checkArgument(
- digest == null || offset == 0,
- "Missing input chunk for digest %s",
- digest == null ? "" : ContentDigests.toString(digest));
- digest = chunk.getDigest();
- // This unconditionally downloads the whole blob into memory!
- Preconditions.checkArgument((int) (digest.getSizeBytes() / 1024) < MAX_MEMORY_KBYTES);
- blob = new byte[(int) digest.getSizeBytes()];
- }
- Preconditions.checkArgument(digest != null, "First chunk contains no digest");
- Preconditions.checkArgument(
- offset == chunk.getOffset(),
- "Missing input chunk for digest %s",
- ContentDigests.toString(digest));
- if (digest.getSizeBytes() > 0) {
- chunk.getData().copyTo(blob, (int) offset);
- offset = (offset + chunk.getData().size()) % digest.getSizeBytes();
- }
- if (offset == 0) {
- ContentDigest uploadedDigest = cache.uploadBlob(blob);
- Preconditions.checkArgument(
- uploadedDigest.equals(digest),
- "Digest mismatch: client sent %s, server computed %s",
- ContentDigests.toString(digest),
- ContentDigests.toString(uploadedDigest));
- }
- } catch (Exception e) {
- LOG.warning("Request failed: " + e.toString());
- CasUploadBlobReply.Builder reply = CasUploadBlobReply.newBuilder();
- reply
- .getStatusBuilder()
- .setSucceeded(false)
- .setError(
- e instanceof IllegalArgumentException
- ? CasStatus.ErrorCode.INVALID_ARGUMENT
- : CasStatus.ErrorCode.UNKNOWN)
- .setErrorDetail(e.toString());
- responseObserver.onNext(reply.build());
+ public void onNext(WriteRequest request) {
+ if (closed) {
+ return;
+ }
+ if (digest == null) {
+ resourceName = request.getResourceName();
+ digest = parseDigestFromResourceName(resourceName);
+ blob = new byte[(int) digest.getSizeBytes()];
+ }
+ if (digest == null) {
+ responseObserver.onError(
+ invalidArgumentError(
+ "resource_name",
+ "Failed parsing digest from resource_name:" + request.getResourceName()));
+ closed = true;
+ return;
+ }
+ if (request.getWriteOffset() != offset) {
+ responseObserver.onError(
+ invalidArgumentError(
+ "write_offset",
+ "Expected:" + offset + ", received: " + request.getWriteOffset()));
+ closed = true;
+ return;
+ }
+ if (!request.getResourceName().isEmpty()
+ && !request.getResourceName().equals(resourceName)) {
+ responseObserver.onError(
+ invalidArgumentError(
+ "resource_name",
+ "Expected:" + resourceName + ", received: " + request.getResourceName()));
+ closed = true;
+ return;
+ }
+ long size = request.getData().size();
+ if (size > 0) {
+ request.getData().copyTo(blob, (int) offset);
+ offset += size;
+ }
+ boolean shouldFinishWrite = offset == digest.getSizeBytes();
+ if (shouldFinishWrite != request.getFinishWrite()) {
+ responseObserver.onError(
+ invalidArgumentError(
+ "finish_write",
+ "Expected:" + shouldFinishWrite + ", received: " + request.getFinishWrite()));
+ closed = true;
}
}
@Override
public void onError(Throwable t) {
- LOG.warning("Request errored remotely: " + t);
+ LOG.warning("Write request errored remotely: " + t);
+ closed = true;
}
@Override
public void onCompleted() {
- responseObserver.onCompleted();
+ if (closed) {
+ return;
+ }
+ if (digest == null || offset != digest.getSizeBytes()) {
+ responseObserver.onError(
+ StatusProto.toStatusRuntimeException(
+ Status.newBuilder()
+ .setCode(Code.FAILED_PRECONDITION.getNumber())
+ .setMessage("Request completed before all data was sent.")
+ .build()));
+ closed = true;
+ return;
+ }
+ try {
+ Digest d = cache.uploadBlob(blob);
+ if (!d.equals(digest)) {
+ String err = "Received digest " + digest + " does not match computed digest " + d;
+ responseObserver.onError(invalidArgumentError("resource_name", err));
+ closed = true;
+ return;
+ }
+ responseObserver.onNext(WriteResponse.newBuilder().setCommittedSize(offset).build());
+ responseObserver.onCompleted();
+ } catch (Exception e) {
+ LOG.warning("Write request failed: " + e);
+ responseObserver.onError(internalError(e));
+ closed = true;
+ }
}
};
}
}
- class ExecutionCacheServer extends ExecutionCacheServiceImplBase {
+ class ActionCacheServer extends ActionCacheImplBase {
@Override
- public void getCachedResult(
- ExecutionCacheRequest request, StreamObserver<ExecutionCacheReply> responseObserver) {
+ public void getActionResult(
+ GetActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
try {
- ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest());
- ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder();
+ ActionKey actionKey = Digests.unsafeActionKeyFromDigest(request.getActionDigest());
ActionResult result = cache.getCachedActionResult(actionKey);
- if (result != null) {
- reply.setResult(result);
+ if (result == null) {
+ responseObserver.onError(notFoundError(request.getActionDigest()));
+ return;
}
- reply.getStatusBuilder().setSucceeded(true);
- responseObserver.onNext(reply.build());
- } catch (Exception e) {
- LOG.warning("getCachedActionResult request failed: " + e.toString());
- ExecutionCacheReply.Builder reply = ExecutionCacheReply.newBuilder();
- reply
- .getStatusBuilder()
- .setSucceeded(false)
- .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN);
- responseObserver.onNext(reply.build());
- } finally {
+ responseObserver.onNext(result);
responseObserver.onCompleted();
+ } catch (Exception e) {
+ LOG.warning("getActionResult request failed: " + e);
+ responseObserver.onError(internalError(e));
}
}
@Override
- public void setCachedResult(
- ExecutionCacheSetRequest request, StreamObserver<ExecutionCacheSetReply> responseObserver) {
+ public void updateActionResult(
+ UpdateActionResultRequest request, StreamObserver<ActionResult> responseObserver) {
try {
- ActionKey actionKey = ContentDigests.unsafeActionKeyFromDigest(request.getActionDigest());
- cache.setCachedActionResult(actionKey, request.getResult());
- ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder();
- reply.getStatusBuilder().setSucceeded(true);
- responseObserver.onNext(reply.build());
- } catch (Exception e) {
- LOG.warning("setCachedActionResult request failed: " + e.toString());
- ExecutionCacheSetReply.Builder reply = ExecutionCacheSetReply.newBuilder();
- reply
- .getStatusBuilder()
- .setSucceeded(false)
- .setError(ExecutionCacheStatus.ErrorCode.UNKNOWN);
- responseObserver.onNext(reply.build());
- } finally {
+ ActionKey actionKey = Digests.unsafeActionKeyFromDigest(request.getActionDigest());
+ cache.setCachedActionResult(actionKey, request.getActionResult());
+ responseObserver.onNext(request.getActionResult());
responseObserver.onCompleted();
+ } catch (Exception e) {
+ LOG.warning("updateActionResult request failed: " + e);
+ responseObserver.onError(internalError(e));
}
}
}
@@ -363,23 +402,26 @@ public class RemoteWorker {
// How long to wait for the uid command.
private static final Duration uidTimeout = Durations.fromMicros(30);
- class ExecutionServer extends ExecuteServiceImplBase {
+ class ExecutionServer extends ExecutionImplBase {
private final Path workPath;
//The name of the container image entry in the Platform proto
- // (see src/main/protobuf/remote_protocol.proto and
+ // (see third_party/googleapis/devtools/remoteexecution/*/remote_execution.proto and
// experimental_remote_platform_override in
// src/main/java/com/google/devtools/build/lib/remote/RemoteOptions.java)
public static final String CONTAINER_IMAGE_ENTRY_NAME = "container-image";
+ private static final int LOCAL_EXEC_ERROR = -1;
+
public ExecutionServer(Path workPath) {
this.workPath = workPath;
}
- private Map<String, String> getEnvironmentVariables(RemoteProtocol.Command command) {
+ private Map<String, String> getEnvironmentVariables(
+ com.google.devtools.remoteexecution.v1test.Command command) {
HashMap<String, String> result = new HashMap<>();
- for (EnvironmentEntry entry : command.getEnvironmentList()) {
- result.put(entry.getVariable(), entry.getValue());
+ for (EnvironmentVariable v : command.getEnvironmentVariablesList()) {
+ result.put(v.getName(), v.getValue());
}
return result;
}
@@ -412,17 +454,14 @@ public class RemoteWorker {
// null. Otherwise returns docker container name from the parameters.
private String dockerContainer(Action action) throws IllegalArgumentException {
String result = null;
- List<Platform.Property> entries = action.getPlatform().getEntryList();
-
- for (Platform.Property entry : entries) {
- if (entry.getName().equals(CONTAINER_IMAGE_ENTRY_NAME)) {
- if (result == null) {
- result = entry.getValue();
- } else {
+ for (Platform.Property property : action.getPlatform().getPropertiesList()) {
+ if (property.getName().equals(CONTAINER_IMAGE_ENTRY_NAME)) {
+ if (result != null) {
// Multiple container name entries
throw new IllegalArgumentException(
"Multiple entries for " + CONTAINER_IMAGE_ENTRY_NAME + " in action.Platform");
}
+ result = property.getValue();
}
}
return result;
@@ -479,38 +518,34 @@ public class RemoteWorker {
new File(pathString));
}
- public ExecuteReply execute(Action action, Path execRoot)
- throws IOException, InterruptedException, IllegalArgumentException {
- ByteArrayOutputStream stdout = new ByteArrayOutputStream();
- ByteArrayOutputStream stderr = new ByteArrayOutputStream();
- RemoteProtocol.Command command;
- try {
- command =
- RemoteProtocol.Command.parseFrom(cache.downloadBlob(action.getCommandDigest()));
- cache.downloadTree(action.getInputRootDigest(), execRoot);
- } catch (CacheNotFoundException e) {
- LOG.warning("Cache miss on " + ContentDigests.toString(e.getMissingDigest()));
- return ExecuteReply.newBuilder()
- .setCasError(
- CasStatus.newBuilder()
- .setSucceeded(false)
- .addMissingDigest(e.getMissingDigest())
- .setError(CasStatus.ErrorCode.MISSING_DIGEST)
- .setErrorDetail(e.toString()))
- .setStatus(
- ExecutionStatus.newBuilder()
- .setExecuted(false)
- .setSucceeded(false)
- .setError(
- e.getMissingDigest() == action.getCommandDigest()
- ? ExecutionStatus.ErrorCode.MISSING_COMMAND
- : ExecutionStatus.ErrorCode.MISSING_INPUT)
- .setErrorDetail(e.toString()))
- .build();
+ static final int MAX_BLOB_SIZE_FOR_INLINE = 1024 * 10;
+
+ private void passOutErr(byte[] stdout, byte[] stderr, ActionResult.Builder result)
+ throws InterruptedException {
+ if (stdout.length <= MAX_BLOB_SIZE_FOR_INLINE) {
+ result.setStdoutRaw(ByteString.copyFrom(stdout));
+ } else if (stdout.length > 0) {
+ result.setStdoutDigest(cache.uploadBlob(stdout));
+ }
+ if (stderr.length <= MAX_BLOB_SIZE_FOR_INLINE) {
+ result.setStderrRaw(ByteString.copyFrom(stderr));
+ } else if (stderr.length > 0) {
+ result.setStderrDigest(cache.uploadBlob(stderr));
}
+ }
- List<Path> outputs = new ArrayList<>(action.getOutputPathList().size());
- for (String output : action.getOutputPathList()) {
+ public ActionResult execute(Action action, Path execRoot)
+ throws IOException, InterruptedException, IllegalArgumentException, CacheNotFoundException {
+ ByteArrayOutputStream stdout = new ByteArrayOutputStream();
+ ByteArrayOutputStream stderr = new ByteArrayOutputStream();
+ ActionResult.Builder result = ActionResult.newBuilder();
+ com.google.devtools.remoteexecution.v1test.Command command =
+ com.google.devtools.remoteexecution.v1test.Command.parseFrom(
+ cache.downloadBlob(action.getCommandDigest()));
+ cache.downloadTree(action.getInputRootDigest(), execRoot);
+
+ List<Path> outputs = new ArrayList<>(action.getOutputFilesList().size());
+ for (String output : action.getOutputFilesList()) {
Path file = execRoot.getRelative(output);
if (file.exists()) {
throw new FileAlreadyExistsException("Output file already exists: " + file);
@@ -518,63 +553,58 @@ public class RemoteWorker {
FileSystemUtils.createDirectoryAndParents(file.getParentDirectory());
outputs.add(file);
}
+ // TODO(olaola): support output directories.
// TODO(ulfjack): This is basically a copy of LocalSpawnRunner. Ideally, we'd use that
// implementation instead of copying it.
- // TODO(ulfjack): Timeout is specified in ExecuteRequest, but not passed in yet.
- int timeoutSeconds = 60 * 15;
Command cmd =
getCommand(
action,
- command.getArgvList().toArray(new String[] {}),
+ command.getArgumentsList().toArray(new String[] {}),
getEnvironmentVariables(command),
execRoot.getPathString());
-
long startTime = System.currentTimeMillis();
- CommandResult cmdResult;
+ CommandResult cmdResult = null;
try {
cmdResult = cmd.execute(Command.NO_INPUT, Command.NO_OBSERVER, stdout, stderr, true);
} catch (AbnormalTerminationException e) {
cmdResult = e.getResult();
} catch (CommandException e) {
- // At the time this comment was written, this must be a ExecFailedException encapsulating an
- // IOException from the underlying Subprocess.Factory.
- LOG.warning("Execution failed for " + command.getArgvList());
- return ExecuteReply.newBuilder()
- .setResult(
- ActionResult.newBuilder()
- .setReturnCode(LOCAL_EXEC_ERROR))
- .setStatus(
- ExecutionStatus.newBuilder()
- .setExecuted(false)
- .setSucceeded(false)
- .setError(ExecutionStatus.ErrorCode.EXEC_FAILED)
- .setErrorDetail(e.toString()))
- .build();
+ // At the time this comment was written, this must be a ExecFailedException encapsulating
+ // an IOException from the underlying Subprocess.Factory.
+ }
+ final int timeoutSeconds = 60 * 15;
+ // TODO(ulfjack): Timeout is specified in ExecuteRequest, but not passed in yet.
+ boolean wasTimeout =
+ cmdResult != null && cmdResult.getTerminationStatus().timedout()
+ || wasTimeout(timeoutSeconds, System.currentTimeMillis() - startTime);
+ int exitCode;
+ if (wasTimeout) {
+ final String errMessage =
+ "Command:\n"
+ + command.getArgumentsList()
+ + "\nexceeded deadline of "
+ + timeoutSeconds
+ + "seconds";
+ LOG.warning(errMessage);
+ throw StatusProto.toStatusRuntimeException(
+ Status.newBuilder()
+ .setCode(Code.DEADLINE_EXCEEDED.getNumber())
+ .setMessage(errMessage)
+ .build());
+ } else if (cmdResult == null) {
+ exitCode = LOCAL_EXEC_ERROR;
+ } else {
+ exitCode = cmdResult.getTerminationStatus().getRawExitCode();
+ }
+
+ passOutErr(stdout.toByteArray(), stderr.toByteArray(), result);
+ cache.uploadAllResults(execRoot, outputs, result);
+ ActionResult finalResult = result.setExitCode(exitCode).build();
+ if (exitCode == 0) {
+ cache.setCachedActionResult(Digests.computeActionKey(action), finalResult);
}
- long wallTime = System.currentTimeMillis() - startTime;
- boolean wasTimeout = cmdResult.getTerminationStatus().timedout()
- || wasTimeout(timeoutSeconds, wallTime);
- Status status = wasTimeout ? Status.TIMEOUT : Status.SUCCESS;
- int exitCode = status == Status.TIMEOUT
- ? SIGALRM_EXIT_CODE
- : cmdResult.getTerminationStatus().getRawExitCode();
-
- ImmutableList<ContentDigest> outErrDigests =
- cache.uploadBlobs(ImmutableList.of(stdout.toByteArray(), stderr.toByteArray()));
- ContentDigest stdoutDigest = outErrDigests.get(0);
- ContentDigest stderrDigest = outErrDigests.get(1);
- ActionResult.Builder actionResult =
- ActionResult.newBuilder()
- .setReturnCode(exitCode)
- .setStdoutDigest(stdoutDigest)
- .setStderrDigest(stderrDigest);
- cache.uploadAllResults(execRoot, outputs, actionResult);
- cache.setCachedActionResult(ContentDigests.computeActionKey(action), actionResult.build());
- return ExecuteReply.newBuilder()
- .setResult(actionResult)
- .setStatus(ExecutionStatus.newBuilder().setExecuted(true).setSucceeded(true))
- .build();
+ return finalResult;
}
private boolean wasTimeout(int timeoutSeconds, long wallTimeMillis) {
@@ -582,7 +612,7 @@ public class RemoteWorker {
}
@Override
- public void execute(ExecuteRequest request, StreamObserver<ExecuteReply> responseObserver) {
+ public void execute(ExecuteRequest request, StreamObserver<Operation> responseObserver) {
Path tempRoot = workPath.getRelative("build-" + UUID.randomUUID().toString());
try {
tempRoot.createDirectory();
@@ -591,33 +621,42 @@ public class RemoteWorker {
"Work received has "
+ request.getTotalInputFileCount()
+ " input files and "
- + request.getAction().getOutputPathCount()
+ + request.getAction().getOutputFilesCount()
+ " output files.");
}
- ExecuteReply reply = execute(request.getAction(), tempRoot);
- responseObserver.onNext(reply);
+ ActionResult result = execute(request.getAction(), tempRoot);
+ responseObserver.onNext(
+ Operation.newBuilder()
+ .setDone(true)
+ .setResponse(Any.pack(ExecuteResponse.newBuilder().setResult(result).build()))
+ .build());
+ responseObserver.onCompleted();
if (workerOptions.debug) {
- if (!reply.getStatus().getSucceeded()) {
- LOG.warning("Work failed. Request: " + request.toString() + ".");
- } else if (LOG_FINER) {
- LOG.fine("Work completed.");
- }
- }
- if (!workerOptions.debug) {
- FileSystemUtils.deleteTree(tempRoot);
+ LOG.fine("Work completed.");
+ LOG.warning("Preserving work directory " + tempRoot);
} else {
- LOG.warning("Preserving work directory " + tempRoot.toString() + ".");
+ FileSystemUtils.deleteTree(tempRoot);
}
- } catch (IOException | InterruptedException e) {
- LOG.log(Level.SEVERE, "Failure", e);
- ExecuteReply.Builder reply = ExecuteReply.newBuilder();
- reply.getStatusBuilder().setSucceeded(false).setErrorDetail(e.toString());
- responseObserver.onNext(reply.build());
+ } catch (CacheNotFoundException e) {
+ LOG.warning("Cache miss on " + e.getMissingDigest());
+ responseObserver.onError(notFoundError(e.getMissingDigest()));
+ } catch (StatusRuntimeException e) {
+ responseObserver.onError(e);
+ } catch (IllegalArgumentException e) {
+ responseObserver.onError(
+ StatusProto.toStatusRuntimeException(
+ Status.newBuilder()
+ .setCode(Code.INVALID_ARGUMENT.getNumber())
+ .setMessage(e.toString())
+ .build()));
+ } catch (Exception e) {
+ StringWriter stringWriter = new StringWriter();
+ e.printStackTrace(new PrintWriter(stringWriter));
+ LOG.log(Level.SEVERE, "Work failed: " + e + stringWriter.toString());
+ responseObserver.onError(internalError(e));
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
- } finally {
- responseObserver.onCompleted();
}
}
}